https://gitee.com/xuhx615/rocket-mqdemo.git
Producer
):消息发布者Topic
):topic
用于标识同一类业务类型的消息MessageQueue
):传输和存储消息的容器,是消息的最小存储单元Consumer
):消息订阅者ConsumerGroup
):消息订阅者组,多个消费者之间进行负载均衡消费消息nameServer
:注册中心Broker
:消息中转站,用于接收生产者的消息并持久化,然后发送给对应的topic
https://rocketmq.apache.org
下载rocketmq
安装包和rocketmq
图形化界面rocketmq Dashboard
rocketmq
安装包[root@Centos101 rocketmq]# unzip rocketmq-all-5.1.3-bin-release.zip
namserver
启动脚本runserver
的JVM
内存参数(根据实际服务器资源设置,以下参数为学习时设置的参数) [root@Centos101 bin]# vi runserver.sh
修改前:JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
修改后:JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
nameserver
[root@Centos101 bin]# ./mqnamesrv &
nameserver
启动日志[root@Centos101 bin]# tail -100f nohup.out
broker
启动脚本runboker
的JVM
内存参数[root@Centos101 bin]# vi runbroker.sh
修改前:JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g"
修改后:JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m"
broker.conf
配置文件默认配置
#集群名称
brokerClusterName = DefaultCluster
#broker名称
brokerName = broker-a
#当前节点为主节点(主节点为0)
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
新增以下配置
#自动创建topic
autoCreateTopicEnable = true
#namesrvAddr地址
namesrvAddr = 192.168.113.101:9876
broker
[root@Centos101 bin]# ./mqbroker -c ../conf/broker.conf &
生产者:
[root@Centos101 bin]# export NAMESRV_ADDR='192.168.113.101:9876'
[root@Centos101 bin]# ./tools.sh org.apache.rocketmq.example.quickstart.Producer
消费者:
[root@Centos101 bin]# export NAMESRV_ADDR='192.168.113.101:9876'
[root@Centos101 bin]# ./tools.sh org.apache.rocketmq.example.quickstart.Consumer
broker
[root@Centos101 bin]# sh ./mqshutdown broker
nameserver
[root@Centos101 bin]# sh ./mqshutdown namesrv
⭐主机名配置
192.168.113.101 Centos101
192.168.113.102 Centos102
192.168.113.103 Centos103
⭐免密登录
⭐配置文件配置:
2m-2s-async
:2主2从异步刷盘(吞吐量较大,但消息可能会丢失)当生产者发送消息到主节点,主节点会直接给生产返回收到消息,然后异步同步给从节点2m-2s-sync
:2主2从同步刷盘(吞吐量会下降,但消息会更安全)当生产者发送消息到主节点,主节点会同步同步给从节点,然后才给生产者返回收到消息2m-noslave
:2主无从(单点故障),然后还可以直接配置broker.conf
,进行单点环境配置 Centos101:部署nameserver
Centos102:部署nameserver broker-a,broker-b-s
Centos103:部署nameserver broker-b,broker-a-s
⭐集群启动
nameserver
服务启动nameserver
[root@Centos101 bin]# ./mqnamesrv &
[root@Centos102 bin]# ./mqnamesrv &
[root@Centos103 bin]# ./mqnamesrv &
broker
服务启动在Centos102机器上启动broker(broker-a主节点和broker-b-s从节点)
[root@Centos102 bin]# ./mqbroker -c ../conf/2m-2s-async/broker-a.properties &
[root@Centos102 bin]# ./mqbroker -c ../conf/2m-2s-async/broker-b-s.properties &
在Centos103机器上启动broker(broker-b主节点和broker-a-s从节点)
[root@Centos103 bin]# ./mqbroker -c ../conf/2m-2s-async/broker-b.properties &
[root@Centos103 bin]# ./mqbroker -c ../conf/2m-2s-async/broker-a-s.properties &
在Centos102上模拟生产者
[root@Centos102 bin]# export NAMESRV_ADDR='Centos101:9876;Centos102:9876;Centos103:9876'
[root@Centos102 bin]# ./tools.sh org.apache.rocketmq.example.quickstart.Producer
在Centos103上模拟消费者
[root@Centos103 bin]# export NAMESRV_ADDR='Centos101:9876;Centos102:9876;Centos103:9876'
[root@Centos103 bin]# ./tools.sh org.apache.rocketmq.example.quickstart.Consumer
修改application.properties
rocketmq.config.namesrvAddr=Centos101:9876;Centos102:9876;Centos103:9876
修改logback.xml日志路径
引入 Proxy
模块后,Proxy
承担了协议适配、权限管理、消息管理等计算功能,Broker
则更加专注于存储。这样存储和计算相分离,在云原生环境下可以更好地进行资源调度。
[root@Centos102 bin]# ./mqbroker -c ../conf/2m-2s-async/broker-a.properties --enable-proxy &
[root@Centos103 bin]# ./mqbroker -c ../conf/2m-2s-async/broker-b.properties --enable-proxy &
生产者分为同步生产者和异步生产者以及单项生产者
Broker
,等待Broker
返回推送确认,再推送下一个import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
DefaultMQProducer producer = null;
try {
producer = new DefaultMQProducer("syncProducer");
producer.setNamesrvAddr("192.168.113.101:9876");
producer.start();
for (int i = 0; i < 2; i++) {
String body = "Hello zhang " + i;
//参数一:主题、参数二:过滤、参数三:消息内容
Message message = new Message("rocketmq_syncDemo","tag", body.getBytes("UTF-8"));
//同步发送
SendResult result = producer.send(message);
String msgId = result.getMsgId();
SendStatus sendStatus = result.getSendStatus();
logger.info("{}消息发送状态为{}", msgId, sendStatus);
}
} catch (Exception e) {
logger.error("生产者发送消息失败!" ,e);
} finally {
if (producer != null) {
producer.shutdown();
}
}
Broker
,不会等待Broker
返回推送确认,直接推送下一个,但是会回调方法告诉生产者消息是否发送成功。 try {
//异步发送
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
String msgId = sendResult.getMsgId();
SendStatus sendStatus = sendResult.getSendStatus();
logger.info("{}消息发送状态为{}", msgId, sendStatus);
}
@Override
public void onException(Throwable throwable) {
logger.error("消息发送失败", throwable);
}
});
} catch (Exception e) {
logger.error("生产者发送消息失败!" ,e);
} finally {
//异步发送不应该关闭,关闭了便无法回调方法
}
Broker
,不会等待Broke
r返回推送确认,直接推送下一个。 //单向发送
producer.sendOneway(message);
消费者分为推模式和拉模式
⭐推模式:消费者等待Broker
把消息推送过来(被动消费)
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
DefaultMQPushConsumer consumer = null;
try {
consumer = new DefaultMQPushConsumer("group_rocketmq_syncDemo");
consumer.setNamesrvAddr("192.168.113.101:9876");
//参数一:topic、参数二:过滤(*表示不过滤)
consumer.subscribe("rocketmq_syncDemo", "*");
//设置消息监听
//MessageListenerConcurrently 并发消费监听
consumer.setMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
list.forEach(item -> {
try {
logger.info("消息消费成功!消息ID={},消息内容:{}", item.getMsgId(), new String(item.getBody(), "UTF-8"));
} catch (Exception e) {
logger.error("消息消费失败!", e);
}
});
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//消费者启动
consumer.start();
} catch (MQClientException e) {
logger.error("消费者消费异常!",e);
}
⭐拉模式:消费者主动去Broker
上拉取消息(主动消费)
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageQueue;
try {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("group_rocketmq_asyncDemo");
consumer.setNamesrvAddr("192.168.113.101:9876");
Set<String> topicSet = new HashSet<>();
topicSet.add("rocketmq_asyncDemo");
consumer.setRegisterTopics(topicSet);
consumer.start();
//主题遍历
while (true) {
consumer.getRegisterTopics().forEach(item -> {
try {
Set<MessageQueue> messageQueues = consumer.fetchSubscribeMessageQueues(item);
//消息队列
messageQueues.forEach(item2 -> {
try {
long offset = consumer.getOffsetStore().readOffset(item2, ReadOffsetType.READ_FROM_MEMORY);
if (offset < 0) {
offset = consumer.getOffsetStore().readOffset(item2, ReadOffsetType.READ_FROM_STORE);
}
if (offset < 0) {
offset = consumer.maxOffset(item2);
}
if (offset < 0) {
offset = 0;
}
PullResult result = consumer.pull(item2, "*", offset, 32);
if (result != null) {
switch (result.getPullStatus()) {
case FOUND:{
result.getMsgFoundList().forEach(item3 -> {
try {
logger.info("消息消费成功!消息ID={},消息内容:{}", item3.getMsgId(), new String(item3.getBody(), "UTF-8"));
consumer.updateConsumeOffset(item2, result.getNextBeginOffset());
} catch (Exception e) {
logger.error("遍历消息信息失败!" , e);
}
});
break;
}
case NO_NEW_MSG:{
logger.info("没有最新消息!");
break;
}
case NO_MATCHED_MSG: {
logger.info("没有匹配的消息!");
break;
}
case OFFSET_ILLEGAL: {
logger.error("偏移量非法,当前偏移量为{}", offset);
break;
}
}
}
} catch (Exception e) {
logger.error("遍历消息队列失败!", e);
}
});
} catch (MQClientException e) {
logger.error("遍历主题失败!", e);
}
});
}
} catch (MQClientException e) {
logger.error("消息拉取失败!", e);
}
DefaultLitePullConsumer consumer = null;
try {
consumer = new DefaultLitePullConsumer("group_rocketmq_asyncDemo");
consumer.setNamesrvAddr("192.168.113.101:9876");
consumer.subscribe("rocketmq_asyncDemo", "*");
consumer.start();
while (true) {
List<MessageExt> messageExtList = consumer.poll();
messageExtList.forEach(item -> {
try {
logger.info("获取消息成功!消息队列ID={},消息ID={},消息内容{}", item.getQueueId(),item.getMsgId(), new String(item.getBody(), "UTF-8"));
} catch (Exception e) {
logger.error("获取消息异常!",e);
}
});
}
} catch (MQClientException e) {
logger.error("获取消息异常!",e);
} finally {
if (consumer != null) {
consumer.shutdown();
}
}
//指定第一个消息队列消费
consumer.seek(messageQueueList.get(0), 10);
⭐生产者需要将有序消息发送到同一个队列
⭐消费者push
模式,通过加锁的方式,使得一个队列同时只有一个消费者,每隔一段时间就会延长锁的时间(有超时机制),直到整个队列的消息全部消费
⭐消费者pull
模式,只要消费者自己能保证消息顺序消费就行
⭐消费线程数需设置为1
⭐生产者代码
//i 队列序号
for (int i = 0; i < 5; i++) {
//j 消息序号
for (int j = 0; j < 100; j++) {
Message message = new Message("rocketmq_orderDemo", "tag", ("Hello world!" + j).getBytes("UTF-8"));
producer.send(message, new MessageQueueSelector() {
/**
*
* @param list 队列集合
* @param message 消息 (send函数第一个参数)
* @param o 队列序号 (send函数第三个参数)
* @return 消息队列
*/
@Override
public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
return list.get(Integer.parseInt(o.toString()));
}
}, i);
}
}
⭐消费者代码
//MessageListenerOrderly有序消息监听(不要使用并发消费监听)
consumer.setMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
list.forEach(item -> {
try {
logger.info("消息接收成功!消息队列={},消息ID={},消息内容={}", item.getQueueId(), item.getMsgId(), new String(item.getBody(), "UTF-8"));
} catch (Exception e) {
logger.error("消息接收异常!", e);
}
});
return ConsumeOrderlyStatus.SUCCESS;
}
});
生产者:
//设置为广播模式
consumer.setMessageModel(MessageModel.BROADCASTING);
生产者:
//1-18 对应 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h (消费者会跟生产者设置的时间延迟接收消息)
//message.setDelayTimeLevel(3);
//设置自定义时间,单位毫秒
message.setDelayTimeMs(10000L);
⭐优点:减少网络OA
,提高吞吐量
限制:
4M
topic
waitStoreMsgOk
⭐切割消息工具
/**
* 消息集合切割
* 消息大小 = 消息长度 + 主题长度 + 消息自定义属性key长度 + 消息自定义属性val长度 + 20(日志空余)
* @author xuhaixiang
* @date 2023-09-10
*/
public class ListSplitter implements Iterator<List<Message>> {
/**
* 消息大小限制 1MB
*/
private static final int SIZE_LIMIT = 10 * 1000;
/**
* 消息集合
*/
private final List<Message> messageList;
/**
* 当前索引
*/
private int currentIndex;
public ListSplitter(List<Message> messageList) {
this.messageList = messageList;
}
@Override
public boolean hasNext() {
return currentIndex < messageList.size();
}
@Override
public List<Message> next() {
int nextIndex = currentIndex;
int totalSize = 0;
for (; nextIndex < messageList.size(); nextIndex++) {
Message message = messageList.get(nextIndex);
int messageSize = message.getBody().length + message.getTopic().length();
Map<String, String> properties = message.getProperties();
for (String key : properties.keySet()) {
String val = properties.get(key);
messageSize += key.length() + val.length();
}
messageSize += 20;
totalSize += messageSize;
if (totalSize > SIZE_LIMIT) {
nextIndex = nextIndex - 1;
break;
}
}
List<Message> result = messageList.subList(currentIndex, nextIndex);
currentIndex = nextIndex;
return result;
}
}
⭐消息批量发送
List<Message> messages = new ArrayList<>();
for (int i = 0; i < 2000; i++) {
String body = i + "Lorem ipsum dolor sit amet consectetur adipisicing elit. Quasi exercitationem laudantium repellendus quisquam aspernatur est neque quidem vitae nostrum! Quia voluptatibus vitae tempore! Repellendus quam aspernatur, nam neque hic esse!";
//参数一:主题、参数二:过滤、参数三:消息内容
Message message = new Message("rocketmq_syncDemo","tag", body.getBytes("UTF-8"));
messages.add(message);
}
ListSplitter listSplitter = new ListSplitter(messages);
while (listSplitter.hasNext()) {
List<Message> messageList = listSplitter.next();
SendResult result = producer.send(messageList);
String msgId = result.getMsgId();
SendStatus sendStatus = result.getSendStatus();
logger.info("{}消息发送状态为{}", msgId, sendStatus);
}
⭐tag
过滤
String[] tagArr = {"tagA", "tagB", "tagC"};
for (int i = 0; i < 2; i++) {
for (String tag : tagArr) {
String body = tag + ", Hello zhang " + i;
//参数一:主题、参数二:过滤、参数三:消息内容
Message message = new Message("rocketmq_syncDemo",tag, body.getBytes("UTF-8"));
SendResult result = producer.send(message);
String msgId = result.getMsgId();
SendStatus sendStatus = result.getSendStatus();
logger.info("{}消息发送状态为{}", msgId, sendStatus);
}
}
//参数一:topic、参数二:过滤(*表示不过滤),多个tag可以使用||
consumer.subscribe("rocketmq_syncDemo", "tagA || tagC");
⭐SQL
过滤
message.putUserProperty("type", "elg_" + i);
//过滤方式二(注意该sql里面字段是区分大小写的)
//sql过滤方式,borker配置文件必须设置属性enablePropertyFilter=true,并且消费者必须是推模式
//另外消息过滤行为是在broker端进行的,可以提升网络传输性能,但是会增加服务器的压力(将过滤sql推送给broker)
consumer.subscribe("rocketmq_syncDemo", MessageSelector.bySql("TAGS is not null and TAGS in ('tagA','tagC') and type = 'elg_0'"));
⭐事务消息是分布式系统中保证最终一致性的两阶段提交的消息实现。他可以保证本地事务执行与消息发送两个操作的原子性,也就是两个操作一起成功或者一起失败。
⭐事务消息机制的关键是在发送消息时会将消息转为一个half
消息,并存入rocketmq
内部的一个Topic(RMQ_SYS_TRANS_HALF_TOPIC)
,这个topic
对消费者是不可见的。再经过一系列事务检查通过后,再将消息转存到目标topic
,这样消费者就可见了。
⭐事务消息实现原理主要通过两个发送阶段和一个确认阶段来实现
⭐本地事务消息执行器(本地事务执行和本地事务回查,用于向rocketmq
发送提交、回滚、无状态三种结果)
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
/**
* 本地事务实现类
* @author xuhaixiang
* @date 2023-09-11
*/
public class TransactionListenerImpl implements TransactionListener {
/**
* 日志对象
*/
private static final Logger logger = LoggerFactory.getLogger(TransactionListenerImpl.class);
/**
* 本地事务执行
* @param message
* @param o
* @return
*/
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
String tags = message.getTags();
logger.info("{}本地事务执行", tags);
if ("tagA".equals(tags)) {
//tagA允许发送
return LocalTransactionState.COMMIT_MESSAGE;
}
if ("tagB".equals(tags)) {
//tagB消息回滚
return LocalTransactionState.ROLLBACK_MESSAGE;
}
//其他消息无状态,无状态消息会进行本地事务回查
return LocalTransactionState.UNKNOW;
}
/**
* 本地事务回查
* @param messageExt
* @return
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
String tags = messageExt.getTags();
logger.info("{}本地事务回查", tags);
if ("tagC".equals(tags)) {
//tagC本地事务回查允许发送
return LocalTransactionState.COMMIT_MESSAGE;
}
return LocalTransactionState.UNKNOW;
}
}
⭐生产者
TransactionMQProducer producer = null;
try {
producer = new TransactionMQProducer("transactionProductor");
producer.setNamesrvAddr("192.168.113.101:9876");
//开启异步线程,用于异步执行本地事务执行和回查两个动作
ExecutorService service = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS ,new ArrayBlockingQueue<>(20000), new ThreadFactory(){
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("transaction");
return thread;
}
});
producer.setExecutorService(service);
//设置本地事务执行器
producer.setTransactionListener(new TransactionListenerImpl());
producer.start();
String[] tags = {"tagA", "tagB", "tagC", "tagD", "tagE"};
for (int i = 0; i < 10; i++) {
for (String tag : tags) {
Message message = new Message("rocketmq_transactionDemo", tag, (tag + " Hello world!" + i).getBytes("UTF-8"));
TransactionSendResult result = producer.sendMessageInTransaction(message, null);
logger.info("消息发送成功!消息ID={}" + result.getMsgId());
}
}
//让生产者存活一段时间可以回调本地事务执行和本地事务回查
Thread.sleep(60000);
} catch (Exception e) {
logger.error("消息发送异常!", e);
} finally {
if (producer != null) {
producer.shutdown();
}
}
⭐消费者。事务与消费没有任何关系,消费者正常消费消息就行。
消息丢失的几种情况:
broker
,当网络发生异常,消息可能会丢失ack
返回,当我们发现消息发送失败,可以做一个重试机制ack
告诉broker
收到,但是在接下来处理消息时发生了异常,可能会导致消息丢失,消息无法重新消费ack
给broker
broker
存储消息阶段,异步刷盘可能会出现问题导致消息丢失rocketmq
的消息持久化机制是指将消息存储在磁盘上,以确保消息能够可靠存储和检索
rocketmq
消息持久化涉及以下三个角色
CommitLog
消息存储文件
commitLog
,然后返回生产者ack
commitLog
1G
,超过则新开辟一个文件ConsumeQueue
commitLog
当前读取的偏移量、消息大小、tags
值IndexFile
Index
偏移量push
模式,通过加锁的方式,使得一个队列同时只有一个消费者,每隔一段时间就会延长锁的时间(有超时机制),直到整个队列的消息全部消费pull
模式,只要消费者自己能保证消息顺序消费就行