QBM之前使用的消息中间件是ActiveMQ,后续需要升级为RocketMQ。
MQ广泛应用于很多业务场景中,主要的作用
常用MQ中间件对比,参考官方文档:https://rocketmq.apache.org/zh/docs/4.x/introduction/03whatis
协议和特点 | 消息有序性 | 定时消息 | 批量消息 | 广播消息 | 消息过滤 | 服务器触发的重新投递 | 消息存储 | ||
---|---|---|---|---|---|---|---|---|---|
ActiveMQ | Push model, support OpenWire, STOMP, AMQP, MQTT, JMS | Exclusive (独自)Consumer or Exclusive Queues can ensure ordering | Supported | Not Suppored | Supported | Supported | Not Supported | Supports very fast persistence using JDBC along with a high performance journal,such as levelDB, kahaDB | |
Kafka | Pull model, support TCP | Ensure ordering of messages within a partition | Not Supported | Supported, with async producer | Not Supported | Supported, you can use Kafka Streams to filter messages | Not Supported | High performance file storage | |
RocketMQ | Pull model, support TCP, JMS, OpenMessaging | Ensure strict ordering of messages,and can scale out gracefully | Supported | Supported, with sync mode to avoid message loss | Supported | Supported, property filter expressions based on SQL92 | Supported | High performance and low latency file storage |
通过学习并结合业务知识,重点思考的问题:
RocketMQ是由阿里捐赠给Apache的一款低延迟、高并发、高可用、高可靠的分布式消息中间件。经历了淘宝双十一的洗礼。RocketMQ既可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。
基于最基础的发布订阅模型,而在实际的应用中,结构会更复杂。例如为了支持高并发和水平扩展,中间的消息主题需要进行分区(对应Message Queue),同一个Topic会有多个生产者,同一个信息会有多个消费者,消费者之间要进行负载均衡等。
ps:存储消息Topic的 代理服务器( Broker ),是实际部署过程对应的代理服务器。
核心概念
消息类型分类
其他消息相关
参考:
安装RocketMQ参考:https://rocketmq.apache.org/docs/quickStart/01quickstart
容器安装RocketMQ,需要分开安装Nameserver容器和Broker容器以及控制台Console容器,其中Nameserver和Broker的连接通过broker.conf
这样做是为了解耦和方便管理:https://juejin.cn/post/7218438764100108325
开发测试直接使用docker安装
# 拉取镜像
docker pull rocketmqinc/rocketmq
# 一、启动NameServer容器,创建一个新的容器并指定 RocketMQ 的镜像
docker run -d \
--name rmqnamesrv \
-p 9876:9876 \
-v /home/docker/mydata/rocketmq/conf:/root/config \
-v /home/docker/mydata/rocketmq/logs:/root/logs \
-e "JAVA_OPTS=-Duser.home=/opt" \
rocketmqinc/rocketmq \
sh mqnamesrv
# 参数说明:
-d 以守护线程方式启动
--name rmqnamesrv 设置容器名称
-p 9876:9876 端口映射
-v 把容器内的/root/logs日志路径挂载到宿主机的自定义路径中(需根据自己的路径自行创建)
-v 把容器内的/root/store数据存储目录挂载到宿主机的自定义目录(需根据自己的路径自行创建)
rocketmqinc/rocketmq 使用镜像的名称
sh mqnamesrv 执行name server脚本
# 进入容器
docker exec -it d60b /bin/bash
# 修改broker.conf文件,设置通信的brokerIP
vi ... /conf/broker.conf,然后添加brokerIP1 = xxx.xxx.xxx.xxx,内容为宿主机的IP
# broker.conf的其他配置项
# 所属集群名称,如果节点较多可以配置多个
brokerClusterName = DefaultCluster
#broker名称,master和slave使用相同的名称,表明他们的主从关系
brokerName = broker-a
#0表示Master,大于0表示不同的slave
brokerId = 0
#表示几点做消息删除动作,默认是凌晨4点
deleteWhen = 04
#在磁盘上保留消息的时长,单位是小时
fileReservedTime = 48
#有三个值:SYNC_MASTER,ASYNC_MASTER,SLAVE;同步和异步表示Master和Slave之间同步数据的机制;
brokerRole = ASYNC_MASTER
#刷盘策略,取值为:ASYNC_FLUSH,SYNC_FLUSH表示同步刷盘和异步刷盘;SYNC_FLUSH消息写入磁盘后才返回成功状态,ASYNC_FLUSH不需要;
flushDiskType = ASYNC_FLUSH
# 回到宿主机,将broker.conf拷贝到宿主机
# nameserver容器内配置文件/opt/rocketmq-4.4.0/conf
docker cp d60b:/opt/rocketmq-4.4.0/conf/broker.conf /home/docker/mydata/rocketmq/conf/broker.conf
# 二、启动Broker容器
docker run -d \
--name rmqbroker \
--link rmqnamesrv:namesrv \
-p 10911:10911 \
-p 10909:10909 \
-v /home/docker/mydata/rocketmq/broker/logs:/root/logs \
-v /home/docker/mydata/rocketmq/broker/store:/root/store \
-v /home/docker/mydata/rocketmq/conf/broker.conf:/opt/rocketmq-4.4.0/conf/broker.conf \
-e "NAMESRV_ADDR=namesrv:9876" \
-e "MAX_POSSIBLE_HEAP=200000000" \
rocketmqinc/rocketmq \
sh mqbroker -c ../conf/broker.conf
# 参数说明
--link rmqnamesrv:namesrv 和rmqnamesrv容器通信
-p 10911:10911 把容器的非vip通道端口挂载到宿主机
-p 10909:10909 把容器的vip通道端口挂载到宿主机
-e “NAMESRV_ADDR=namesrv:9876” 指定namesrv的地址为本机namesrv的ip地址:9876
-e “MAX_POSSIBLE_HEAP=200000000” rocketmqinc/rocketmq sh mqbroker 指定broker服务的最大堆内存(暂未配置)
sh mqbroker -c ../conf/broker.conf 读取../conf/broker.conf配置并启动broker
# 三、安装控制台
docker pull styletang/rocketmq-console-ng
docker run -d \
-p 8081:8080 \
-e "JAVA_OPTS=-Drocketmq.config.namesrvAddr=120.46.82.131:9876 -Drocketmq.config.isVIPChannel=false" \
styletang/rocketmq-console-ng
# 四、访问控制台(别忘了开8081防火墙)
xxx.xxx.xxx.xxx:8081
RocketMQ消息构成分为四部分
什么时候该用Topic?什么时候该用Tag?从下面四点考虑
总的来说,针对消息分类,您可以选择创建多个 Topic,或者在同一个 Topic 下创建多个 Tag。但通常情况下,不同的 Topic 之间的消息没有必然的联系,而 Tag 则用来区分同一个 Topic 下相互关联的消息,例如全集和子集的关系、流程先后的关系。
顺序消息场景举例
例如创建订单的场景,需要保证同一个订单的生成、付款和发货,这三个操作被顺序执行。如果是普通消息,订单A的消息可能会被轮询发送到不同的队列中,不同队列的消息将无法保持顺序,而顺序消息发送时将ShardingKey相同(同一订单号)的消息序路由到一个逻辑队列中。
顺序消息如何保证严格的消息顺序性?
如果某Broker掉线,此时队列总数会发生变化,如何保证消息顺序性?如果队列数量变化,同一个分片key的消息可能会被发送到不同的队列上,也可能发送到掉线的Broker队列,会发送失败,针对第一种情况,就不能保证消息的顺序性。RocketMQ提供两种模式,如果需要保证严格顺序性而不是可用性,创建Topic指定 --order = true。其次要保证NameServer中的配置 orderMessageEnable 和 returnOrderTopicConfigToBroker 必须是 true。如果上述任意一个条件不满足,则是保证可用性而不是严格顺序。
事务消息如何保证事务性的?
ps:此外,需要注意的是事务消息的生产组名称 ProducerGroupName不能随意设置。事务消息有回查机制,回查时Broker端如果发现原始生产者已经崩溃,则会联系同一生产者组的其他生产者实例回查本地事务执行情况以Commit或Rollback半事务消息。
package scl.controller;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.*;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import scl.mq.RocketMqConstants;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @projectName: learn-projects-demo
* @package: scl.controller
* @className: TestController
* @author: sichaolong
* @description: TODO
* @date: 2023/9/17 12:38
* @version: 1.0
*/
@RestController
public class TestController {
@Autowired
private RocketMQTemplate rocketmqTemplate;
@GetMapping("/template/test")
public SendResult testRocketMqTemplate() {
Message<String> msg = MessageBuilder.withPayload("Hello,RocketMQ").build();
rocketmqTemplate.send(RocketMqConstants.TOPIC_TEST, msg);
SendResult sendResult = rocketmqTemplate.syncSend(RocketMqConstants.TOPIC_TEST_SYNC, msg);
System.out.println(sendResult);
return sendResult;
}
/**
* 同步发送消息
* @throws Exception
*/
@GetMapping("/client/sync/test")
public void testRocketMqClientSync() throws Exception {
// 初始化一个producer并设置Producer group name
DefaultMQProducer producer = new DefaultMQProducer("test-client-producer-group");
//(1)设置NameServer地址
producer.setNamesrvAddr("120.46.82.131:9876");
//(2)启动producer
producer.start();
for (int i = 0; i < 100; i++) {
// 创建一条消息,并指定topic、tag、body等信息,tag可以理解成标签,对消息进行再归类,RocketMQ可以在消费端对tag进行过滤
org.apache.rocketmq.common.message.Message msg = new org.apache.rocketmq.common.message.Message("TopicTest"
/* Topic */,
"TagA"
/* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
/* Message body */
);
//(3)利用producer进行发送,并同步等待发送结果
SendResult sendResult = producer.send(msg);
//(4)
System.out.printf("%s%n", sendResult);
}
// 一旦producer不再使用,关闭producer
producer.shutdown();
}
/**
* 异步发送消息
* @throws Exception
*/
@GetMapping("/client/async/test")
public void testRocketMqClientAsync() throws Exception {
// 初始化一个producer并设置Producer group name
DefaultMQProducer producer = new DefaultMQProducer("test-client-producer-group");
//(1)设置NameServer地址
producer.setNamesrvAddr("120.46.82.131:9876");
//(2)启动producer
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0);
int messageCount = 100;
final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
for (int i = 0; i < messageCount; i++) {
try {
final int index = i;
// 创建一条消息,并指定topic、tag、body等信息,tag可以理解成标签,对消息进行再归类,RocketMQ可以在消费端对tag进行过滤
org.apache.rocketmq.common.message.Message msg = new org.apache.rocketmq.common.message.Message("TopicTest"
/* Topic */,
"TagA"
/* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
/* Message body */
);
// 异步发送消息, 发送结果通过callback返回给客户端
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%-10d OK %s %n", index,
sendResult.getMsgId());
countDownLatch.countDown();
}
@Override
public void onException(Throwable e) {
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
countDownLatch.countDown();
}
});
} catch (Exception e) {
e.printStackTrace();
countDownLatch.countDown();
}
}
//异步发送,如果要求可靠传输,必须要等回调接口返回明确结果后才能结束逻辑,否则立即关闭Producer可能导致部分消息尚未传输成功
countDownLatch.await(5, TimeUnit.SECONDS);
// 一旦producer不再使用,关闭producer
producer.shutdown();
}
/**
* 发送单一消息
* @throws Exception
*/
@GetMapping("/client/once/test")
public void testRocketMqClientOnce() throws Exception {
// 初始化一个producer并设置Producer group name
DefaultMQProducer producer = new DefaultMQProducer("test-client-producer-group");
//(1)设置NameServer地址
producer.setNamesrvAddr("120.46.82.131:9876");
//(2)启动producer
producer.start();
for (int i = 0; i < 100; i++) {
// 创建一条消息,并指定topic、tag、body等信息,tag可以理解成标签,对消息进行再归类,RocketMQ可以在消费端对tag进行过滤
org.apache.rocketmq.common.message.Message msg = new org.apache.rocketmq.common.message.Message("TopicTest"
/* Topic */,
"TagA"
/* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
/* Message body */
);
// 由于在oneway方式发送消息时没有请求应答处理,如果出现消息发送失败,则会因为没有重试而导致数据丢失。若数据不可丢,建议选用可靠同步或可靠异步发送方式。
producer.sendOneway(msg);
}
// 一旦producer不再使用,关闭producer
producer.shutdown();
}
/**
* 发送顺序消息
* @throws Exception
*/
@GetMapping("/client/ordered/test")
public void testRocketMqClientOrdered() throws Exception {
try {
DefaultMQProducer producer = new DefaultMQProducer("test-client-producer-group");
//(1)设置NameServer地址
producer.setNamesrvAddr("120.46.82.131:9876");
producer.start();
String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 100; i++) {
int orderId = i % 10;
// 创建一条消息,并指定topic、tag、body等信息,tag可以理解成标签,对消息进行再归类,RocketMQ可以在消费端对tag进行过滤
org.apache.rocketmq.common.message.Message msg =
new org.apache.rocketmq.common.message.Message(
"TopicTest",
tags[i % tags.length],
"KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, org.apache.rocketmq.common.message.Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
// 这里传递的orderId,作为分区key
}, orderId);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
e.printStackTrace();
}
}
/**
* 发送延时消息
* @throws Exception
*/
@GetMapping("/client/delay/test")
public void testRocketMqClientDelay() throws Exception {
// Instantiate a producer to send scheduled messages
DefaultMQProducer producer = new DefaultMQProducer("test-client-producer-group");
//(1)设置NameServer地址
producer.setNamesrvAddr("120.46.82.131:9876");
producer.start();
// Launch producer
producer.start();
int totalMessagesToSend = 100;
for (int i = 0; i < totalMessagesToSend; i++) {
org.apache.rocketmq.common.message.Message msg = new org.apache.rocketmq.common.message.Message("TopicTest"
/* Topic */,
"TagA"
/* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
/* Message body */
);
// This message will be delivered to consumer 10 seconds later.
msg.setDelayTimeLevel(3);
// Send the message
producer.send(msg);
}
// Shutdown producer after use.
producer.shutdown();
}
/**
* 发送批量消息
* @throws Exception
*/
@GetMapping("/client/batch/test")
public void testRocketMqClientBatch() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("test-client-producer-group");
producer.setNamesrvAddr("120.46.82.131:9876");
producer.start();
//If you just send messages of no more than 1MiB at a time, it is easy to use batch
//Messages of the same batch should have: same topic, same waitStoreMsgOK and no schedule support
String topic = "BatchTest";
List< org.apache.rocketmq.common.message.Message> messages = new ArrayList<>();
messages.add(new org.apache.rocketmq.common.message.Message(topic, "Tag", "OrderID001", "Hello world 0".getBytes()));
messages.add(new org.apache.rocketmq.common.message.Message(topic, "Tag", "OrderID002", "Hello world 1".getBytes()));
messages.add(new org.apache.rocketmq.common.message.Message(topic, "Tag", "OrderID003", "Hello world 2".getBytes()));
producer.send(messages);
}
/**
* 发送事务消息
* @throws Exception
*/
@GetMapping("/client/transaction/test")
public void testRocketMqClientTransaction() throws Exception {
// 线程池
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
// 事务管理器
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("test-client-producer-group");
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();
for (int i = 0; i < 10; i++) {
try {
org.apache.rocketmq.common.message.Message msg = new org.apache.rocketmq.common.message.Message("TopicTest"
/* Topic */,
"TagA"
/* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
/* Message body */
);
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
}
/**
* 实现事务管理器
*/
static class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
/**
* 执行事务
* @param msg
* @param arg
* @return
*/
@Override
public LocalTransactionState executeLocalTransaction(org.apache.rocketmq.common.message.Message msg, Object arg) {
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(msg.getTransactionId(), status);
return LocalTransactionState.UNKNOW;
}
/**
* 检查事务状态
* @param msg
* @return
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
Integer status = localTrans.get(msg.getTransactionId());
if (null != status) {
switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
default:
return LocalTransactionState.COMMIT_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}
}
消息消费分配策略:某一个消费者组在集群消费模式下(广播模式下无效),可以配置消息的分配策略, 通过设置消费者的consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueAveragely()),常见的策略
消息消费位点:每个队列会记录最小位点、最大位点。对于消费组还有消费位点的概念
一般情况下消费位点正常更新,不会存在消息重复,如果某个消费者宕机 or 新的消费者 加入消费者组,在集群模式下,就会触发重平衡,每个消费者之前消费的队列可能发生改变,针对某个队列的最大位点值是由客户端消费之后同步到队列的,过程存在延迟,导致消费者在重新更换消费队列的时候读取该队列脏的消费位点,导致该队列消息消费少量重复。
**消费方式推、拉:**MQ的消费模式可以大致分为两种,一种是Push、一种是Pull。
(1)Push模式并发消费API
// 设置消费者组的当前消费者 为 集群模式(默认)
consumer.setMessageModel(MessageModel.CLUSTERING);
// 设置消费者组的当前消费者 为 广播模式
consumer.setMessageModel(MessageModel.BROADCASTING);
// Push 模式
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
// 初始化consumer,并设置consumer group name
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
// 设置NameServer地址
consumer.setNamesrvAddr("localhost:9876");
//订阅一个或多个topic,并指定tag过滤条件,这里指定*表示接收所有tag的消息
consumer.subscribe("TopicTest", "*");
//注册回调接口来处理从Broker中收到的消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
// 返回消息消费状态,ConsumeConcurrentlyStatus.CONSUME_SUCCESS为消费成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动Consumer
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
(2)Push模式顺序消费API:集群模式下,同一消费组的消费者存在并发消费的时候,可能会有多个线程同时消费一个队列的消息,因此即使发送端通过发送顺序消息保证消息在同一个队列中按照FIFO的顺序,也无法保证消息实际被顺序消费。上面代码实现MessageListenerConcurrently接口,保证并发消费,有序性消费如何实现?
consumer.registerMessageListener(new MessageListenerOrderly() {
AtomicLong consumeTimes = new AtomicLong(0);
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
this.consumeTimes.incrementAndGet();
if ((this.consumeTimes.get() % 2) == 0) {
return ConsumeOrderlyStatus.SUCCESS;
} else if ((this.consumeTimes.get() % 5) == 0) {
context.setSuspendCurrentQueueTimeMillis(3000);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
(3)Push模式的消息重试:只在集群模式下某个Consumer消费失败的消息会被Broker重新投递给其他消费者消费,若达到最大重试次数还没被成功消费,则消息将被投递到死信队列。
// 重试次数
consumer.setMaxReconsumeTimes(10);
// 重试间隔
consumer.setSuspendCurrentQueueTimeMillis(5000);
顺序消费和并发消费重试机制不同,
(4)Push模式的死信队列:消息重试至一定次数仍不被成功消费,被发送到特殊的队列中,称为死信队列Dead-Letter Queue,死信队列是死信Topic下分区数唯一的单独队列。如果产生了死信消息,那对应的ConsumerGroup的死信Topic名称为%DLQ%ConsumerGroupName,死信队列的消息将不会再被消费。可以利用RocketMQ Admin工具或者RocketMQ Dashboard上查询到对应死信消息的信息。
疑问:顺序消息进入死信队列影响后续的消息消费吗?
当顺序消息进入死信队列时,RocketMQ会将它标记为无法正常消费的消息,并将其存储在DLQ中。这意味着这些消息将不再参与后续的正常消费流程,但它们并不会直接影响后续的消息。
存在两种Pull模式,一种是比较原始的Pull Consumer,另外一种是Lite Pull Consumer
普通的Pull Consumer API
public class PullConsumerTest {
public static void main(String[] args) throws MQClientException {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.start();
try {
// Pull Consumer 模式
// 指定需要pull的队列,获取topic下的全部队列:Set queueSet = consumer.fetchSubscribeMessageQueues("TopicTest");
// 同一个消费组下的多个LitePullConsumer会负载均衡消费,与PushConsumer一致。
MessageQueue mq = new MessageQueue();
mq.setQueueId(0);
mq.setTopic("TopicTest");
mq.setBrokerName("jinrongtong-MacBook-Pro.local");
long offset = 26;
PullResult pullResult = consumer.pull(mq, "*", offset, 32);
if (pullResult.getPullStatus().equals(PullStatus.FOUND)) {
System.out.printf("%s%n", pullResult.getMsgFoundList());
consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());
}
} catch (Exception e) {
e.printStackTrace();
}
consumer.shutdown();
}
}
Lite Pull Consumer 的Subscribe API
public class LitePullConsumerSubscribe {
public static volatile boolean running = true;
public static void main(String[] args) throws Exception {
DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("lite_pull_consumer_test");
litePullConsumer.subscribe("TopicTest", "*");
litePullConsumer.setPullBatchSize(20);
litePullConsumer.start();
try {
while (running) {
// litePullConsumer默认是自动提交位点
List<MessageExt> messageExts = litePullConsumer.poll();
System.out.printf("%s%n", messageExts);
}
} finally {
litePullConsumer.shutdown();
}
}
}
Lite Pull Consumer 的Assign API
参卡:4.x官网文档:https://rocketmq.apache.org/zh/docs/4.x/introduction/03whatis
Producer、Consumer又是如何找到Topic和Broker的地址呢?消息的具体发送和接收又是怎么进行的呢?
RocketMQ部署架构上主要分为四个部分
小结
// TODO