• 分布式消息队列RocketMQ继承SpringBoot


    一、介绍  

    Springboot 继承RocketMQ:
    1. com.alibaba.cloud
    2. spring-cloud-starter-stream-rocketmq

    底层封装了 rocketmq-client

    参数设置

    application.yml

    1. #rocketmq配置
    2. rocketmq:
    3. name-server: 127.0.0.1:9876
    4. # 生产者配置
    5. producer:
    6. #生产者组名,规定在一个应用里面必须唯一
    7. group: producerGroup
    8. #消息发送的超时时间 默认3000ms
    9. send-message-timeout: 3000
    10. #消息达到4096字节的时候,消息就会被压缩。默认 4096
    11. compress-message-body-threshold: 4096
    12. #最大的消息限制,默认为128K
    13. max-message-size: 4194304
    14. #同步消息发送失败重试次数
    15. retry-times-when-send-failed: 3
    16. #在内部发送失败时是否重试其他代理,这个参数在有多个broker时才生效
    17. retry-next-server: true
    18. #异步消息发送失败重试的次数
    19. retry-times-when-send-async-failed: 3
    20. consumer:
    21. isOnOff: on
    22. # 发送同一类消息设置为同一个group,保证唯一默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标识
    23. groupName: sampleGroup
    24. # mq的nameserver地址
    25. namesrvAddr: ip:9876
    26. # 消费者订阅的主题topic和tags(*标识订阅该主题下所有的tags),格式: topic~tag1||tag2||tags3;
    27. topic: topic2022
    28. tag: "*"
    29. # 消费者线程数据量
    30. consumeThreadMin: 5
    31. consumeThreadMax: 32
    32. # 设置一次消费信息的条数,默认1
    33. consumeMessageBatchMaxSize: 1

    二、关键类

    1.RocketMQTemplate:提供了各种操作MQ的发放。 
    
    2.RocketMQLocalTransactionListener:本地事务监听器 。
    
    3.RocketMQListener:消费信息监听器 。
    
    4.MessageQueueSelector:消息队列选择策略。
     
    5.DefaultMQProducer:默认的生产者。

    三、生产者方法

    同步(顺序、普通、延迟消息)消息:

    • //发送普通同步消息-Object
    • syncSend(String destination, Object payload)
    • //发送普通同步消息-Message
    • syncSend(String destination, Message message)
    • //发送普通同步消息-Object,并设置发送超时时间
    • syncSend(String destination, Object payload, long timeout)
    • //发送普通同步消息-Message,并设置发送超时时间
    • syncSend(String destination, Message message, long timeout)
    • //发送普通同步延迟消息,并设置超时,这个下文会演示
    • syncSend(String destination, Message message, long timeout, int delayLevel)

    异步消息(普通、延迟消息):

    • //发送普通异步消息-Object
    • asyncSend(String destination, Object payload, SendCallback sendCallback)
    • //发送普通异步消息-Message
    • asyncSend(String destination, Message message, SendCallback sendCallback)
    • //发送普通异步消息-Object,并设置发送超时时间
    • asyncSend(String destination, Object payload, SendCallback sendCallback, long timeout)
    • //发送普通异步消息-Message,并设置发送超时时间
    • asyncSend(String destination, Message message, SendCallback sendCallback, long timeout)

    单向(顺序、普通、延迟消息)信息:

    •  //发送单向信息--Message
    •  sendOneWay(String destination, Message message)
    • //发送单向信息--Object
    • sendOneWay(String destination, Object payload)
    • //发送单向顺序信息--Message
    • public void sendOneWayOrderly(String destination, Message message, String hashKey)
    • //发送单向顺序信息--Object
    • public void sendOneWayOrderly(String destination, Object payload, String hashKey)

    事务消息:

    //发送事务消息

    public TransactionSendResult sendMessageInTransaction(String txProducerGroup, String destination, Message message, Object arg)

    //取消事务消息

    public void removeTransactionMQProducer(String txProducerGroup)

    带标签tag消息:

    rocketMQTemplate.syncSend(topic+":"+tag, message)

    SQL表达式过滤消息(SQL92过滤):

    需要在broker.conf添加enablePropertyFilter=true 支持sql过滤

    SQL表达式方式可以根据发送消息时输入的属性进行一些计算。

    RocketMQ的SQL表达式语法 只定义了一些基本的语法功能。

    • 数字比较,如>,>=,<,<=,BETWEEN,=;
    • 字符比较,如:=,<>,IN;IS NULL or IS NOT NULL;
    • 逻辑运算符:AND, OR, NOT;
    • 常量类型:
    • 数值,如:123, 3.1415;
    • 字符, 如:‘abc’, 必须使用单引号;
    • NULL,特殊常量
    • Boolean, TRUE or FALSE;

    String msg = "sql过滤";
    Message message = MessageBuilder.withPayload(msg).build() ;
     Map headers = new HashMap<>() ;
    headers.put("i", 5) ;
    rocketMQTemplate.convertAndSend("test-sql-topic", message, headers);

    注意:

            消息可以指定过滤类型为tag,则 destination传入格式为:topicName:tagName   

    四、消费者方法

    1.push模式

            消息的生产者将消息发送到broker,然后broker将消息主动推送给订阅了该消息的消费者端。

    1. /**
    2. * 通用消费者
    3. */
    4. @Service
    5. @RocketMQMessageListener(consumerGroup = "common-customer-group", topic = "common_topic", messageModel = MessageModel.CLUSTERING)
    6. public class CommonConsumerListener implements RocketMQListener {
    7. @Override
    8. public void onMessage(Object o) {
    9. System.out.println("通用消费者-----------------"+o.toString());
    10. //消费者处理时抛出异常时就会自动重试
    11. throw new RuntimeException("消费者处理时抛出异常时就会自动重试");
    12. }
    13. }
    14. /**
    15. * consumerGroup: 消费组
    16. * topic:主题
    17. * selectorExpression: 过滤表达式: tag/指明了消息过滤使用SQL92方式
    18. * messageModel:消息模式 集群clustering(每条消息只能有一个消费者进行消费)、广播broadcasting(广播消息,所有订阅者都能收到消息)
    19. */
    20. @Service
    21. @RocketMQMessageListener(consumerGroup = "common-customer-sql-group", topic = "common_topic",selectorType = SelectorType.SQL92 ,selectorExpression = "type='user' or a <7", messageModel = MessageModel.CLUSTERING)
    22. public class SqlConsumerListener implements RocketMQListener {
    23. @Override
    24. public void onMessage(MessageExt message) {
    25. System.out.println("消费消息:"+new String(message.getBody()));
    26. System.out.println("消费消息:"+message.getProperties());
    27. }
    28. }
    29. /**
    30. * consumerGroup: 消费组
    31. * topic:主题
    32. * selectorExpression: 过滤表达式: tag/SQL
    33. * messageModel:消息模式 集群clustering、广播broadcasting
    34. */
    35. @Service
    36. @RocketMQMessageListener(consumerGroup = "common-customer-tag-group", topic = "common_topic",selectorType = SelectorType.TAG ,selectorExpression = "tagA||tagB", messageModel = MessageModel.CLUSTERING)
    37. public class TagConsumerListener implements RocketMQListener {
    38. @Override
    39. public void onMessage(Object o) {
    40. System.out.println(o.toString());
    41. }
    42. }

    注意:

            消费者监听消息,如果抛出异常,则开启重试。 

    • selectorType:指定消息通过的tag的方式,默认为SelectorType.TAG
    • messageModel:指定消息的消费模式,默认为MessageModel.CLUSTERING模式每条消息只能由一个消费者消费,而MessageModel.BROADCASTING模式为广播模式,所有订阅者都能消费。
    • selectorExpression : 采用rocketMQ支持的表达式,如Tag消息能够被消费,多个采用||分割。SQL消息采用SQL表达式过滤消息。
    • consumerGroup: 消费组
    • topic:主题

    2.pull模式

            消息生产者将消息发送到broker上,然后由消费者自发的去broker去拉取消息。

    五、事务消息

            RocketMQ 事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。RocketMQ 的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致本质是两阶段提交。

    1.分布式事务
            分布式事务通俗来说,一次操作由若干分支组成,这些分支操作分属于不同的应用,分布在不同的服务器上,分布式事务需要保证这些操作要么全部成功,要么全部失败,分支事务与普通事务一样,就是为了保证操作结果的一致性。

    2.事务消息
            要么同时成功,要么同时失败。

    3.半事务消息
            暂存投递的消息,生产者已经成功地将消息发送到了消息队列RocketMQ版服务端,但是消息队列RocketMQ版服务端未收到生产者对消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半事务消息。

    4.消息回查
            由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,消息队列RocketMQ版服务端通过扫描发现某条消息长期处于“半事务消息”时,需要主动向消息生产者询问该消息的最终状态(Commit或是Rollback),该询问过程即消息回查。消息事务重试

    5.本地事务状态
            生产者回调操作执行的结果为本地事务状态。其会发送给事务协调者,而协调者会在发送给事务管理者,事务管理者会根据协调者发送过来的本地事务状态来决定全局事务确认指令。该句话:具体请查看阿里云 Seata 分布式事务

    RocketMQ 的事务消息分为3种状态,分别是提交状态、回滚状态、未知状态。
    RocketMQLocalTransactionState.COMMIT: 提交事务,它允许消费者消费此消息。
    RocketMQLocalTransactionState.ROLLBACK: 回滚事务,它代表该消息将被删除,不允许被消费。
    RocketMQLocalTransactionState.UNKNOWN: 未知状态,它代表需要检查消息队列来确定状态。调用checkLocalTransaction方法,最多重试15次,超过了默认丢弃此消息.

    RocketMQ事务消息通过异步确保方式,保证事务的最终一致性

     

     事务消息发送步骤如下:

    1. 生产者将半事务消息发送至消息队列RocketMQ服务端
    2. 消息队列RocketMQ版服务端将消息持久化成功之后,向生产者返回Ack确认消息已经发送成功,此时消息为半事务消息
    3. 生产者开始执行本地事务逻辑。
    4. 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:
      1.  二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。
      2. 二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。
    5. 在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查

    事务消息回查步骤如下:

    1. 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
    2. 生产者根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。
    3. 回查间隔时间:系统默认每隔30秒发起一次定时任务,对未提交的半事务消息进行回查,共持续12小时。
    4. 第一次消息回查最快时间:该参数支持自定义设置。若指定消息未达到设置的最快回查时间前,系统默认每隔30秒一次的回查任务不会检查该消息。

    备注:本地事务的回滚依赖于本地DB的ACID特性,订阅方的成功消费由 MQ Server 的失败重试机制进行保证。

    六、案例展示

    工具类:

    1. package com.rocketmq.service.config;
    2. import org.apache.rocketmq.client.producer.SendCallback;
    3. import org.apache.rocketmq.client.producer.SendResult;
    4. import org.apache.rocketmq.client.producer.TransactionSendResult;
    5. import org.apache.rocketmq.spring.core.RocketMQTemplate;
    6. import org.slf4j.Logger;
    7. import org.slf4j.LoggerFactory;
    8. import org.springframework.beans.factory.annotation.Autowired;
    9. import org.springframework.lang.Nullable;
    10. import org.springframework.messaging.Message;
    11. import org.springframework.messaging.MessagingException;
    12. import org.springframework.messaging.core.MessagePostProcessor;
    13. import org.springframework.stereotype.Component;
    14. import javax.annotation.PostConstruct;
    15. import javax.annotation.PreDestroy;
    16. import java.util.Collection;
    17. import java.util.Map;
    18. /**
    19. * @Author zhw
    20. * @Description rocketMQ 封装API
    21. * @Date 14:53 2022/9/7
    22. **/
    23. @Component
    24. public class RocketMqBuilder {
    25. /**
    26. * 日志
    27. */
    28. private static final Logger LOG = LoggerFactory.getLogger(RocketMqBuilder.class);
    29. /**
    30. * rocketmq模板注入
    31. */
    32. @Autowired
    33. private RocketMQTemplate rocketMQTemplate;
    34. @PostConstruct
    35. public void init() {
    36. LOG.info("---RocketMq助手初始化---");
    37. }
    38. //==================同步消息开始===========================//
    39. /**
    40. * 发送普通同步消息-Object
    41. *
    42. * @param topic 主题
    43. * @param message 消息
    44. */
    45. public SendResult syncSendMessage(String topic, Object message) {
    46. return rocketMQTemplate.syncSend(topic, message);
    47. }
    48. /**
    49. * 发送普通同步消息-Message
    50. *
    51. * @param topic 主题
    52. * @param message 消息
    53. */
    54. public SendResult syncSendMessage(String topic, Message message) {
    55. return rocketMQTemplate.syncSend(topic, message);
    56. }
    57. /**
    58. * 发送普通同步消息-Object,并设置发送超时时间
    59. *
    60. * @param topic 主题
    61. * @param message 消息
    62. * @param timeout 超时时间
    63. */
    64. public SendResult syncSendMessageTimeOut(String topic, Object message, long timeout) {
    65. return rocketMQTemplate.syncSend(topic, message, timeout);
    66. }
    67. /**
    68. * 发送普通同步消息-Message,并设置发送超时时间
    69. *
    70. * @param topic 主题
    71. * @param message 消息
    72. * @param timeout 超时时间
    73. */
    74. public SendResult syncSendMessageTimeOut(String topic, Message message, long timeout) {
    75. return rocketMQTemplate.syncSend(topic, message, timeout);
    76. }
    77. /**
    78. * 发送普通同步延迟消息,并设置超时
    79. *
    80. * @param topic 主题
    81. * @param message 消息
    82. * @param timeout 超时时间
    83. * @param delayLevel 延迟级别
    84. */
    85. public SendResult syncSendMessageTimeOut(String topic, Message message, long timeout, int delayLevel) {
    86. return rocketMQTemplate.syncSend(topic, message, timeout, delayLevel);
    87. }
    88. /**
    89. * 发送批量普通同步消息
    90. *
    91. * @param topic 主题
    92. * @param messages 多个消息集合
    93. */
    94. public SendResult syncBtachSendMessage(String topic, Collection messages) {
    95. return rocketMQTemplate.syncSend(topic, messages);
    96. }
    97. /**
    98. * 发送批量普通同步消息,并设置发送超时时间
    99. *
    100. * @param topic 主题
    101. * @param messages 多个消息集合
    102. */
    103. public SendResult syncBtachSendMessage(String topic, Collection messages, long timeout) {
    104. return rocketMQTemplate.syncSend(topic, messages, timeout);
    105. }
    106. /**
    107. * 发送同步 顺序消息-Object
    108. *
    109. * @param topic 主题
    110. * @param message 消息
    111. * @param hashKey 标识
    112. */
    113. public SendResult syncSendMessage(String topic, Object message, String hashKey) {
    114. return rocketMQTemplate.syncSendOrderly(topic, message, hashKey);
    115. }
    116. /**
    117. * 发送同步 顺序消息-Object 并设置发送超时时间
    118. *
    119. * @param topic 主题
    120. * @param message 消息
    121. * @param hashKey 标识
    122. */
    123. public SendResult syncSendMessage(String topic, Object message, String hashKey, long timeout) {
    124. return rocketMQTemplate.syncSendOrderly(topic, message, hashKey, timeout);
    125. }
    126. /**
    127. * 发送同步 顺序消息-Message 并设置发送超时时间
    128. *
    129. * @param topic 主题
    130. * @param message 消息
    131. * @param hashKey 标识
    132. */
    133. public SendResult syncSendMessage(String topic, Message message, String hashKey) {
    134. return rocketMQTemplate.syncSendOrderly(topic, message, hashKey);
    135. }
    136. /**
    137. * 发送同步 顺序消息-Message 并设置发送超时时间
    138. *
    139. * @param topic 主题
    140. * @param message 消息
    141. * @param hashKey 标识
    142. * @param timeout 超时时间
    143. */
    144. public SendResult syncSendMessage(String topic, Message message, String hashKey, long timeout) {
    145. return rocketMQTemplate.syncSendOrderly(topic, message, hashKey, timeout);
    146. }
    147. //==================同步消息结束===========================//
    148. //==================异步消息开始===========================//
    149. /**
    150. * 发送普通异步消息-Object
    151. *
    152. * @param topic 主题
    153. * @param message 消息
    154. */
    155. public void asyncSendMessage(String topic, Object message, SendCallback sendCallback) {
    156. rocketMQTemplate.asyncSend(topic, message, sendCallback != null ? sendCallback : getDefaultSendCallBack());
    157. }
    158. /**
    159. * 发送普通异步消息-Message
    160. *
    161. * @param topic 主题
    162. * @param message 消息
    163. */
    164. public void asyncSendMessage(String topic, Message message, SendCallback sendCallback) {
    165. rocketMQTemplate.asyncSend(topic, message, sendCallback != null ? sendCallback : getDefaultSendCallBack());
    166. }
    167. /**
    168. * 发送普通异步消息-Object,并设置发送超时时间
    169. *
    170. * @param topic 主题
    171. * @param message 消息
    172. * @param timeout 超时时间
    173. */
    174. public void asyncSendMessage(String topic, Object message, SendCallback sendCallback, long timeout) {
    175. rocketMQTemplate.asyncSend(topic, message, sendCallback != null ? sendCallback : getDefaultSendCallBack(), timeout);
    176. }
    177. /**
    178. * 发送普通异步消息-Message,并设置发送超时时间
    179. *
    180. * @param topic 主题
    181. * @param message 消息
    182. */
    183. public void asyncSendMessage(String topic, Message message, SendCallback sendCallback, long timeout) {
    184. rocketMQTemplate.asyncSend(topic, message, sendCallback, timeout);
    185. }
    186. /**
    187. * 发送普通异步消息-Message,并设置发送超时时间
    188. *
    189. * @param topic 主题
    190. * @param message 消息
    191. */
    192. public void asyncSendMessage(String topic, Message message, long timeout) {
    193. rocketMQTemplate.asyncSend(topic, message, getDefaultSendCallBack(), timeout);
    194. }
    195. /**
    196. * 发送异步 顺序消息-Object
    197. *
    198. * @param topic 主题
    199. * @param message 消息
    200. * @param hashKey 标识
    201. */
    202. public void asyncSendMessage(String topic, Object message, String hashKey, SendCallback sendCallback) {
    203. rocketMQTemplate.asyncSendOrderly(topic, message, hashKey, sendCallback != null ? sendCallback : getDefaultSendCallBack());
    204. }
    205. /**
    206. * 发送异步 顺序消息-Object 并设置发送超时时间
    207. *
    208. * @param topic 主题
    209. * @param message 消息
    210. * @param hashKey 标识
    211. * @param timeout 超时时间
    212. */
    213. public void asyncSendMessage(String topic, Object message, String hashKey, SendCallback sendCallback, long timeout) {
    214. rocketMQTemplate.asyncSendOrderly(topic, message, hashKey, sendCallback != null ? sendCallback : getDefaultSendCallBack(), timeout);
    215. }
    216. /**
    217. * 发送异步 顺序消息-Message 并设置发送超时时间
    218. *
    219. * @param topic 主题
    220. * @param message 消息
    221. * @param hashKey 标识
    222. */
    223. public void syncSendMessage(String topic, Message message, String hashKey, SendCallback sendCallback) {
    224. rocketMQTemplate.asyncSendOrderly(topic, message, hashKey, sendCallback != null ? sendCallback : getDefaultSendCallBack());
    225. }
    226. /**
    227. * 发送异步 顺序消息-Message 并设置发送超时时间
    228. *
    229. * @param topic 主题
    230. * @param message 消息
    231. * @param hashKey 标识
    232. * @param timeout 超时时间
    233. */
    234. public void syncSendMessage(String topic, Message message, String hashKey, SendCallback sendCallback, long timeout) {
    235. rocketMQTemplate.asyncSendOrderly(topic, message, hashKey, sendCallback != null ? sendCallback : getDefaultSendCallBack(), timeout);
    236. }
    237. //==================异步消息结束===========================//
    238. //==================单向消息开始===========================//
    239. /**
    240. * 单向普通消息-Object
    241. *
    242. * @param topic 主题
    243. * @param message 消息
    244. */
    245. public void sendOneWay(String topic, Object message) {
    246. rocketMQTemplate.sendOneWay(topic, message);
    247. }
    248. /**
    249. * 单向普通消息-Message
    250. *
    251. * @param topic 主题
    252. * @param message 消息
    253. */
    254. public void sendOneWay(String topic, Message message) {
    255. rocketMQTemplate.sendOneWay(topic, message);
    256. }
    257. /**
    258. * 单向 顺序消息-Object
    259. *
    260. * @param topic 主题
    261. * @param message 消息
    262. */
    263. public void sendOneWay(String topic, Object message, String hashKey) {
    264. rocketMQTemplate.sendOneWayOrderly(topic, message, hashKey);
    265. }
    266. /**
    267. * 单向 顺序消息-Message
    268. *
    269. * @param topic 主题
    270. * @param message 消息
    271. */
    272. public void sendOneWay(String topic, Message message, String hashKey) {
    273. rocketMQTemplate.sendOneWayOrderly(topic, message, hashKey);
    274. }
    275. //==================单向消息结束===========================//
    276. /**
    277. * 开启 事务
    278. *
    279. * @param topic 主题
    280. * @param message 消息
    281. */
    282. public TransactionSendResult sendMessageInTransaction(String txProducerGroup, String topic, Message message, Object value) {
    283. return rocketMQTemplate.sendMessageInTransaction(txProducerGroup, topic, message, value);
    284. }
    285. /**
    286. * 移除 事务
    287. *
    288. * @param txProducerGroup 事务生产组
    289. */
    290. public void sendMessageInTransaction(String txProducerGroup) {
    291. rocketMQTemplate.removeTransactionMQProducer(txProducerGroup);
    292. }
    293. /**
    294. * 默认CallBack函数
    295. *
    296. * @return
    297. */
    298. private SendCallback getDefaultSendCallBack() {
    299. return new SendCallback() {
    300. @Override
    301. public void onSuccess(SendResult sendResult) {
    302. LOG.info("---发送MQ成功---");
    303. }
    304. @Override
    305. public void onException(Throwable throwable) {
    306. LOG.error("---发送MQ失败---" + throwable.getMessage(), throwable.getMessage());
    307. }
    308. };
    309. }
    310. @PreDestroy
    311. public void destroy() {
    312. LOG.info("---RocketMq助手注销---");
    313. }
    314. /**
    315. * @param topic 主题
    316. * @param message 消息
    317. */
    318. public void convertAndSend(String topic, Message message) {
    319. rocketMQTemplate.convertAndSend(topic, message);
    320. }
    321. /**
    322. * @param topic 主题
    323. * @param message 消息
    324. */
    325. public void convertAndSend(String topic, Message message,Map headMap) {
    326. rocketMQTemplate.convertAndSend(topic, message,headMap);
    327. }
    328. }

    前提:

    1. @Autowired
    2. private RocketMqBuilder rocketMqUtils;
    3. private static final String topic = "common_topic";
    4. private static final String asyncTopic = "common_async_topic";
    5. private static final String delayTopic = "common_delay_topic";
    6. private static final String topicOrder = "topic_orderly";
    7. private static final String syncTopicOrder = "sync_test_topic_orderly";
    8. private static final String oneTopicOrder = "one_test_topic_orderly";
    9. /*事务 消费主题 消费组 */
    10. private static final String txTopic = "common-topic-tx";
    11. private static final String txGroup = "common-tx-group";

    1.普通消息

    生产者:

    1. /**
    2. * 发送普通批量消息
    3. *
    4. * @return
    5. */
    6. @RequestMapping("/sendBatchMessage")
    7. public SendStatus sendBatchMessage() {
    8. List msgs = new ArrayList();
    9. for (int i = 0; i < 10; i++) {
    10. msgs.add(MessageBuilder.withPayload("Hello RocketMQ Batch Msg#" + i).setHeader(RocketMQHeaders.KEYS, "KEY_" + i).build());
    11. }
    12. SendResult sendResult = rocketMqUtils.syncSendMessage(topic, msgs);
    13. return sendResult.getSendStatus();
    14. }
    15. /**
    16. * 发送普通消息
    17. *
    18. * @param message
    19. * @return
    20. */
    21. @RequestMapping("/sendCommonMessage")
    22. public SendStatus sendCommonMessage(String message) {
    23. SendResult sendResult = rocketMqUtils.syncSendMessage(topic, message);
    24. return sendResult.getSendStatus();
    25. }
    26. /**
    27. * 发送普通消息
    28. *
    29. * @param message
    30. * @return
    31. */
    32. @RequestMapping("/sendCommonMessageOne")
    33. public SendStatus sendCommonMessageOne(String message) {
    34. SendResult sendResult = rocketMqUtils.syncSendMessage(topic, MessageBuilder.withPayload(message).build());
    35. return sendResult.getSendStatus();
    36. }

     消费者:

    1. import org.apache.rocketmq.common.message.MessageExt;
    2. import org.apache.rocketmq.spring.annotation.MessageModel;
    3. import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
    4. import org.apache.rocketmq.spring.core.RocketMQListener;
    5. import org.springframework.stereotype.Service;
    6. /**
    7. * 通用消费者
    8. */
    9. @Service
    10. @RocketMQMessageListener(consumerGroup = "common-customer-group", topic = "common_topic", messageModel = MessageModel.CLUSTERING)
    11. public class CommonConsumerListener implements RocketMQListener {
    12. @Override
    13. public void onMessage(MessageExt messageExt) {
    14. System.out.println("通用消费者-----------------"+messageExt.toString());
    15. //消费者处理时抛出异常时就会自动重试
    16. // throw new RuntimeException("消费者处理时抛出异常时就会自动重试");
    17. }
    18. }

    2.异步消息

    生产者:

    1. /**
    2. * 发送异步消息
    3. *
    4. * @param message
    5. * @return
    6. */
    7. @RequestMapping("/sendAsyncCommonMessage")
    8. public String sendAsyncCommonMessage(String message) {
    9. rocketMqUtils.asyncSendMessage(asyncTopic, message, new SendCallback() {
    10. @Override
    11. public void onSuccess(SendResult sendResult) {
    12. log.info("异步消息发送成功:{}", sendResult);
    13. }
    14. @Override
    15. public void onException(Throwable throwable) {
    16. log.info("异步消息发送失败:{}", throwable.getMessage());
    17. }
    18. });
    19. return "ok";
    20. }
    21. /**
    22. * 发送异步消息
    23. *
    24. * @param message
    25. * @return
    26. */
    27. @RequestMapping("/sendAsyncCommonMessageOne")
    28. public String sendAsyncCommonMessageOne(String message) {
    29. rocketMqUtils.asyncSendMessage(asyncTopic, MessageBuilder.withPayload(message).build(), new SendCallback() {
    30. @Override
    31. public void onSuccess(SendResult sendResult) {
    32. log.info("异步消息发送成功:{}", sendResult);
    33. }
    34. @Override
    35. public void onException(Throwable throwable) {
    36. log.info("异步消息发送失败:{}", throwable.getMessage());
    37. }
    38. });
    39. return "ok";
    40. }

    消费者:

    1. import org.apache.rocketmq.common.message.MessageExt;
    2. import org.apache.rocketmq.spring.annotation.MessageModel;
    3. import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
    4. import org.apache.rocketmq.spring.core.RocketMQListener;
    5. import org.springframework.stereotype.Service;
    6. /**
    7. * 异步消费
    8. */
    9. @Service
    10. @RocketMQMessageListener(consumerGroup = "common-customer-async-group", topic = "common_async_topic", messageModel = MessageModel.CLUSTERING)
    11. public class CommonConsumerAsyncMessageListener implements RocketMQListener {
    12. @Override
    13. public void onMessage(MessageExt messageExt) {
    14. System.out.println("异步消费-----------------"+messageExt.toString());
    15. //消费者处理时抛出异常时就会自动重试
    16. // throw new RuntimeException("消费者处理时抛出异常时就会自动重试");
    17. }
    18. }

    3.顺序消息

    生产者:

    1. /**
    2. * 顺序信息的三种方式:同步
    3. *
    4. * @return
    5. */
    6. @RequestMapping("/sendSyncOrderMessage")
    7. public String sendSyncOrderMessage() {
    8. //参数一:topic 如果想添加tag,可以使用"topic:tag"的写法
    9. //参数二:消息内容
    10. //参数三:hashKey 用来计算决定消息发送到哪个消息队列
    11. String message = "orderly message: ";
    12. for (int i = 0; i < 10; i++) {
    13. // 模拟有序发送消息
    14. rocketMqUtils.syncSendMessage(topicOrder, message + i, "select_queue_key");
    15. }
    16. return "ok";
    17. }
    18. /**
    19. * 顺序信息的三种方式:异步
    20. *
    21. * @return
    22. */
    23. @RequestMapping("/sendASyncOrderMessage")
    24. public String sendASyncOrderMessage() {
    25. //参数一:topic 如果想添加tag,可以使用"topic:tag"的写法
    26. //参数二:消息内容
    27. //参数三:hashKey 用来计算决定消息发送到哪个消息队列, 一般是订单ID,产品ID等
    28. rocketMqUtils.asyncSendMessage(syncTopicOrder, "111111创建", "111111", new SendCallback() {
    29. @Override
    30. public void onSuccess(SendResult sendResult) {
    31. log.info("异步消息发送成功:{}", sendResult);
    32. }
    33. @Override
    34. public void onException(Throwable throwable) {
    35. log.info("异步消息发送失败:{}", throwable.getMessage());
    36. }
    37. });
    38. rocketMqUtils.asyncSendMessage(syncTopicOrder, "111111支付", "111111", new SendCallback() {
    39. @Override
    40. public void onSuccess(SendResult sendResult) {
    41. log.info("异步消息发送成功:{}", sendResult);
    42. }
    43. @Override
    44. public void onException(Throwable throwable) {
    45. log.info("异步消息发送失败:{}", throwable.getMessage());
    46. }
    47. });
    48. rocketMqUtils.asyncSendMessage(syncTopicOrder, "111111完成", "111111", new SendCallback() {
    49. @Override
    50. public void onSuccess(SendResult sendResult) {
    51. log.info("异步消息发送成功:{}", sendResult);
    52. }
    53. @Override
    54. public void onException(Throwable throwable) {
    55. log.info("异步消息发送失败:{}", throwable.getMessage());
    56. }
    57. });
    58. rocketMqUtils.asyncSendMessage(syncTopicOrder, "222222创建", "222222", new SendCallback() {
    59. @Override
    60. public void onSuccess(SendResult sendResult) {
    61. log.info("异步消息发送成功:{}", sendResult);
    62. }
    63. @Override
    64. public void onException(Throwable throwable) {
    65. log.info("异步消息发送失败:{}", throwable.getMessage());
    66. }
    67. });
    68. rocketMqUtils.asyncSendMessage(syncTopicOrder, "222222支付", "222222", new SendCallback() {
    69. @Override
    70. public void onSuccess(SendResult sendResult) {
    71. log.info("异步消息发送成功:{}", sendResult);
    72. }
    73. @Override
    74. public void onException(Throwable throwable) {
    75. log.info("异步消息发送失败:{}", throwable.getMessage());
    76. }
    77. });
    78. rocketMqUtils.asyncSendMessage(syncTopicOrder, "222222完成", "222222", new SendCallback() {
    79. @Override
    80. public void onSuccess(SendResult sendResult) {
    81. log.info("异步消息发送成功:{}", sendResult);
    82. }
    83. @Override
    84. public void onException(Throwable throwable) {
    85. log.info("异步消息发送失败:{}", throwable.getMessage());
    86. }
    87. });
    88. return "ok";
    89. }
    90. /**
    91. * 顺序信息的三种方式:单向
    92. *
    93. * @return
    94. */
    95. @RequestMapping("/sendOneWayMessage")
    96. public String sendOneWayMessage() {
    97. //参数一:topic 如果想添加tag,可以使用"topic:tag"的写法
    98. //参数二:消息内容
    99. //参数三:hashKey 用来计算决定消息发送到哪个消息队列
    100. String message = "one orderly message: ";
    101. for (int i = 0; i < 10; i++) {
    102. // 模拟有序发送消息
    103. rocketMqUtils.sendOneWay(oneTopicOrder, message + i, "select_queue_key");
    104. }
    105. return "ok";
    106. }

    消费者:

    1. import org.apache.rocketmq.common.message.MessageExt;
    2. import org.apache.rocketmq.spring.annotation.ConsumeMode;
    3. import org.apache.rocketmq.spring.annotation.MessageModel;
    4. import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
    5. import org.apache.rocketmq.spring.core.RocketMQListener;
    6. import org.springframework.stereotype.Service;
    7. /**
    8. * 顺序信息的三种方式:同步
    9. * 并发消费模式(ConsumeMode.CONCURRENTLY)
    10. * ConsumeMode.ORDERLY:顺序消费
    11. */
    12. @Service
    13. @RocketMQMessageListener(consumerGroup = "customer-orderly-group", topic = "topic_orderly" ,consumeMode = ConsumeMode.ORDERLY, messageModel = MessageModel.CLUSTERING)
    14. public class CommonOrderConsumerListener implements RocketMQListener {
    15. @Override
    16. public void onMessage(MessageExt messageExt) {
    17. System.out.println("顺序信息的三种方式:同步-----------------"+messageExt.toString());
    18. }
    19. }
    20. import org.apache.rocketmq.common.message.MessageExt;
    21. import org.apache.rocketmq.spring.annotation.ConsumeMode;
    22. import org.apache.rocketmq.spring.annotation.MessageModel;
    23. import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
    24. import org.apache.rocketmq.spring.core.RocketMQListener;
    25. import org.springframework.stereotype.Service;
    26. /**
    27. * 顺序信息的三种方式:异步
    28. * 并发消费模式(ConsumeMode.CONCURRENTLY)
    29. * ConsumeMode.ORDERLY:顺序消费
    30. */
    31. @Service
    32. @RocketMQMessageListener(consumerGroup = "common-customer-sync-orderly-group", topic = "sync_test_topic_orderly" ,consumeMode = ConsumeMode.ORDERLY, messageModel = MessageModel.CLUSTERING)
    33. public class CommonSyncOrderConsumerListener implements RocketMQListener {
    34. @Override
    35. public void onMessage(MessageExt messageExt) {
    36. System.out.println("顺序信息的三种方式:异步-----------------"+messageExt.toString());
    37. }
    38. }
    39. import org.apache.rocketmq.common.message.MessageExt;
    40. import org.apache.rocketmq.spring.annotation.ConsumeMode;
    41. import org.apache.rocketmq.spring.annotation.MessageModel;
    42. import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
    43. import org.apache.rocketmq.spring.core.RocketMQListener;
    44. import org.springframework.stereotype.Service;
    45. /**
    46. * 顺序信息的三种方式:单向
    47. * 并发消费模式(ConsumeMode.CONCURRENTLY)
    48. * ConsumeMode.ORDERLY:顺序消费
    49. */
    50. @Service
    51. @RocketMQMessageListener(consumerGroup = "common-customer-one-orderly-group", topic = "one_test_topic_orderly",consumeMode = ConsumeMode.ORDERLY, messageModel = MessageModel.CLUSTERING)
    52. public class CommonOneOrderConsumerListener implements RocketMQListener {
    53. @Override
    54. public void onMessage(MessageExt messageExt) {
    55. System.out.println("顺序信息的三种方式:单向-----------------"+messageExt.toString());
    56. }
    57. }

    4.延迟消息

    生产者:

    1. /**
    2. * 同步延迟消息
    3. * rocketMQ的延迟消息发送其实是已发送就已经到broker端了,然后消费端会延迟收到消息。
    4. * RocketMQ 目前只支持固定精度的定时消息。
    5. * 固定等级:1到18分别对应1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
    6. * 延迟的底层方法是用定时任务实现的。
    7. *
    8. * @param message
    9. * @return
    10. */
    11. @RequestMapping("/sendSyncDelayMessage")
    12. public SendStatus sendSyncDelayMessage(String message) {
    13. Message messageData = MessageBuilder.withPayload(message + new Date()).build();
    14. /**
    15. * @param destination formats: `topicName:tags`
    16. * @param message 消息体
    17. * @param timeout 发送超时时间
    18. * @param delayLevel 延迟级别 1到18
    19. * @return {@link SendResult}
    20. */
    21. SendResult sendResult = rocketMqUtils.syncSendMessageTimeOut(delayTopic, messageData, 3000, 3);
    22. return sendResult.getSendStatus();
    23. }
    24. /**
    25. * 异步延迟消息
    26. *
    27. * @param message
    28. * @return
    29. */
    30. @RequestMapping("/sendASyncDelayMessage")
    31. public String sendASyncDelayMessage(String message) {
    32. Message messageData = MessageBuilder.withPayload(message + new Date()).build();
    33. SendCallback sc = new SendCallback() {
    34. @Override
    35. public void onSuccess(SendResult sendResult) {
    36. log.info("发送异步延时消息成功");
    37. }
    38. @Override
    39. public void onException(Throwable throwable) {
    40. log.info("发送异步延时消息失败:{}", throwable.getMessage());
    41. }
    42. };
    43. /**
    44. * @param destination formats: `topicName:tags`
    45. * @param message 消息体
    46. * @param timeout 发送超时时间
    47. * @param delayLevel 延迟级别 1到18
    48. * @return {@link SendResult}
    49. */
    50. rocketMqUtils.asyncSendMessage(delayTopic, messageData, sc, 3000);
    51. return "ok";
    52. }

    消费者:

    1. import org.apache.rocketmq.common.message.MessageExt;
    2. import org.apache.rocketmq.spring.annotation.MessageModel;
    3. import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
    4. import org.apache.rocketmq.spring.core.RocketMQListener;
    5. import org.springframework.stereotype.Service;
    6. /**
    7. * 延迟消息
    8. */
    9. @Service
    10. @RocketMQMessageListener(consumerGroup = "common-customer-delay-group", topic = "common_delay_topic", messageModel = MessageModel.CLUSTERING)
    11. public class CommonDelayConsumerListener implements RocketMQListener {
    12. @Override
    13. public void onMessage(MessageExt messageExt) {
    14. System.out.println("延迟消息-----------------" + messageExt.toString());
    15. //消费者处理时抛出异常时就会自动重试
    16. // throw new RuntimeException("消费者处理时抛出异常时就会自动重试");
    17. }
    18. }

    5.tag消息

    生产者:

    1. /**
    2. * 发送普通消息-带tag
    3. *
    4. * @param message
    5. * @return
    6. */
    7. @RequestMapping("/sendCommonMessageByTag")
    8. public SendStatus sendCommonMessage(String message, String tag) {
    9. SendResult sendResult = rocketMqUtils.syncSendMessage(topic + ":" + tag, message);
    10. return sendResult.getSendStatus();
    11. }

    消费者:

    1. import org.apache.rocketmq.common.message.MessageExt;
    2. import org.apache.rocketmq.spring.annotation.MessageModel;
    3. import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
    4. import org.apache.rocketmq.spring.annotation.SelectorType;
    5. import org.apache.rocketmq.spring.core.RocketMQListener;
    6. import org.springframework.stereotype.Service;
    7. /**
    8. * consumerGroup: 消费组
    9. * topic:主题
    10. * selectorExpression: 过滤表达式: tag/SQL
    11. * messageModel:消息模式 集群clustering、广播broadcasting
    12. */
    13. @Service
    14. @RocketMQMessageListener(consumerGroup = "common-customer-tag-group", topic = "common_topic",selectorType = SelectorType.TAG ,selectorExpression = "tagA||tagB", messageModel = MessageModel.CLUSTERING)
    15. public class TagConsumerListener implements RocketMQListener {
    16. @Override
    17. public void onMessage(MessageExt messageExt) {
    18. System.out.println(messageExt.toString());
    19. }
    20. }

    6.sql过滤消息

    生产者:

    1. /**
    2. * 发送普通消息-带sql
    3. *
    4. * @param message
    5. * @return
    6. */
    7. @RequestMapping("/sendCommonMessageBySql")
    8. public String sendCommonMessageBySql(String message) {
    9. Map headers = new HashMap<>();
    10. headers.put("type", "user");
    11. headers.put("a", 6);
    12. rocketMqUtils.convertAndSend(topic, MessageBuilder.withPayload(message).build(), headers);
    13. return "ok";
    14. }

    消费者:

    1. import org.apache.rocketmq.common.message.MessageExt;
    2. import org.apache.rocketmq.spring.annotation.MessageModel;
    3. import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
    4. import org.apache.rocketmq.spring.annotation.SelectorType;
    5. import org.apache.rocketmq.spring.core.RocketMQListener;
    6. import org.springframework.stereotype.Service;
    7. /**
    8. * consumerGroup: 消费组
    9. * topic:主题
    10. * selectorExpression: 过滤表达式: tag/指明了消息过滤使用SQL92方式
    11. * messageModel:消息模式 集群clustering(每条消息只能有一个消费者进行消费)、广播broadcasting(广播消息,所有订阅者都能收到消息)
    12. */
    13. @Service
    14. @RocketMQMessageListener(consumerGroup = "common-customer-sql-group", topic = "common_topic",selectorType = SelectorType.SQL92 ,selectorExpression = "type='user' or a <7", messageModel = MessageModel.CLUSTERING)
    15. public class SqlConsumerListener implements RocketMQListener {
    16. @Override
    17. public void onMessage(MessageExt message) {
    18. System.out.println("消费消息:"+new String(message.getBody()));
    19. System.out.println("消费消息:"+message.getProperties());
    20. }
    21. }

    7.事务消息

    生产者:

    1. /**
    2. * 事务消息
    3. *
    4. * @return
    5. */
    6. @RequestMapping("/sendTxMessage")
    7. public String sendTxMessage() {
    8. String[] tags = {"a", "b", "c"};
    9. for (int i = 0; i < 3; i++) {
    10. Message message = MessageBuilder.withPayload("事务消息===>" + i).setHeader("rocketmq_tags", tags[i]).build();
    11. //发送半事务消息
    12. TransactionSendResult res = rocketMqUtils.sendMessageInTransaction(txGroup, txTopic + ":" + tags[i], message, i + 1);
    13. if (res.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE) && res.getSendStatus().equals(SendStatus.SEND_OK)) {
    14. log.info("事物消息发送成功");
    15. }
    16. log.info("事物消息发送结果:{}", res);
    17. }
    18. return "ok";
    19. }

    生产者监听器:

    1. import com.alibaba.druid.util.StringUtils;
    2. import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
    3. import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
    4. import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
    5. import org.slf4j.Logger;
    6. import org.slf4j.LoggerFactory;
    7. import org.springframework.messaging.Message;
    8. import org.springframework.stereotype.Component;
    9. /**
    10. 生产者消息监听器:
    11. * 用于监听本地事务执行的状态和检查本地事务状态。
    12. */
    13. @Component
    14. @RocketMQTransactionListener(txProducerGroup = "common-tx-group")
    15. public class TransactionListener implements RocketMQLocalTransactionListener {
    16. private static final Logger log = LoggerFactory.getLogger("TransactionListener");
    17. @Override
    18. public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
    19. // 执行本地事务
    20. String tag = String.valueOf(msg.getHeaders().get("rocketmq_tags"));
    21. if (StringUtils.equals("a", tag)){
    22. //这里只讲TAGA消息提交,状态为可执行
    23. return RocketMQLocalTransactionState.COMMIT;
    24. }else if (StringUtils.equals("b", tag)) {
    25. return RocketMQLocalTransactionState.ROLLBACK;
    26. } else if (StringUtils.equals("c",tag)) {
    27. return RocketMQLocalTransactionState.UNKNOWN;
    28. }
    29. return RocketMQLocalTransactionState.UNKNOWN;
    30. }
    31. //mq回调检查本地事务执行情况
    32. @Override
    33. public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
    34. log.info("checkLocalTransaction===>{}",msg);
    35. return RocketMQLocalTransactionState.COMMIT;
    36. }
    37. }

    消费者:

    1. import org.apache.rocketmq.spring.annotation.MessageModel;
    2. import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
    3. import org.apache.rocketmq.spring.core.RocketMQListener;
    4. import org.springframework.stereotype.Service;
    5. /**
    6. * 消费事务消息
    7. * 配置RocketMQ监听
    8. */
    9. @Service
    10. @RocketMQMessageListener(consumerGroup = "common-customer-tx", topic = "common-topic-tx",selectorExpression = "TAGA||TAGB||TAGC",messageModel = MessageModel.CLUSTERING)
    11. public class CommonTxConsumerListener implements RocketMQListener {
    12. @Override
    13. public void onMessage(String s) {
    14. System.out.println("消费消息 事务消息:" + s);
    15. }
    16. }

    其他调用方法,请研究工具类。 

    代码

    七、相关问题

    1.如何获取 Topic-Broker 的映射关系?

            Producer 和 Consumer 启动时,也都需要指定 namesrvAddr 的地址,从 Namesrv 集群中选一台建立长连接。生产者每 30 秒从 Namesrv 获取 Topic 跟 Broker 的映射关系,更新到本地内存中。然后再跟 Topic 涉及的所有 Broker 建立长连接,每隔 30 秒发一次心跳。

    2、消费者消费的几种模式?

    RocketMQ 消费者有集群消费和广播消费两种消费模式。

    • 集群消费
      •     一个 Consumer Group 中的各个 Consumer 实例分摊去消费消息,即一条消息只会投递到一个 Consumer Group 下面的一个实例。
    • 广播消费
      •     消息将对一个 Consumer Group 下的各个 Consumer 实例都投递一遍。即即使这些 Consumer 属于同一个Consumer Group ,消息也会被 Consumer Group 中的每个 Consumer 都消费一次。

    3.RocketMQ 为何这么快?

    是因为使用了顺序存储、Page Cache 和异步刷盘
    1)在写入 commitLog 的时候是顺序写入的,这样比随机写入的性能有巨大提升。
    2)写入 commitLog 的时候并不是直接写入磁盘,而是先写入操作系统的 PageCache。最后由操作系统异步将缓存中的数据刷到磁盘。

    4.RocketMQ 的角色构成?

    生产者(Producer):负责产生消息,生产者向消息服务器发送由业务应用程序系统生成的消息。

    消费者(Consumer):负责消费消息,消费者从消息服务器拉取信息并将其输入用户应用程序。

    消息服务器(Broker):是消息存储中心,主要作用是接收来自 Producer 的消息并存储, Consumer 从这里取得消息。

    名称服务器(NameServer):用来保存 Broker 相关 Topic 等元信息并给 Producer ,提供 Consumer 查找 Broker 信息。

    5.RocketMQ 执行流程?

    1)启动 Namesrv 后开始监听端口,等待 Broker、Producer、Consumer 连上来,相当于一个路由控制中心。

    2)Broker 启动,跟所有的 Namesrv 保持长连接,定时发送心跳包。

    3)收发消息前,先创建 Topic。创建 Topic 时,需要指定该 Topic 要存储在哪些 Broker上,也可以在发送消息时自动创建 Topic。

    4)Producer 向该 Topic 发送消息。

    5)Consumer 消费该 Topic 的消息。

    6.Broker 中的消息被消费后会立即删除吗?

            不会,每条消息都会持久化到 CommitLog 中,每个 Consumer 连接到 Broker 后会维持消费进度信息,当有消息消费后只是当前 Consumer 的消费进度(CommitLog的offset)更新了。

    7.Broker 如何保存消息数据?

    RocketMQ 主要的存储文件包括 commitlog、consumequeue 以及 indexfile 三种文件。

            Broker 在收到消息之后,会把消息保存到 commitlog 文件中,同时每个 Topic 对应的 messagequeue 下都会生成 consumequeue 文件用于保存 commitlog 的物理位置偏移量 offset,而 key 和 offset 的对应关系则使用 indexfile 保存。

     

    8.为何使用 NameServer 而非 ZK?

            NameServer 是专为 RocketMQ 设计的轻量级名称服务,为 producer 和 consumer 提供路由信息。具有简单、可集群横吐扩展、无状态,节点之间互不通信等特点。而 RocketMQ 的架构设计决定了只需要一个轻量级的元数据服务器就足够了,只需要保持最终一致,而不需要 Zookeeper 这样的强一致性解决方案,不需要再依赖另一个中间件,从而减少整体维护成本。

     9.生产环境有多个nameserver该如何连接?

            rocketmq.name-server支持配置多个nameserver地址,采用;分隔即可。例如:a:9876;b:9876

    10.发送的消息内容体是如何被序列化与反序列化的?

            RocketMQ的消息体都是以byte[]方式存储。当业务系统的消息内容体如果是java.lang.String类型时,统一按照utf-8编码转成byte[];如果业务系统的消息内容为非java.lang.String类型,则采用jackson-databind序列化成JSON格式的字符串之后,再统一按照utf-8编码转成byte[]

    11.如何指定topic的tags?

            RocketMQ的最佳实践中推荐:一个应用尽可能用一个Topic,消息子类型用tags来标识,tags可以由应用自由设置。 在使用rocketMQTemplate发送消息时,通过设置发送方法的destination参数来设置消息的目的地,destination的格式为topicName:tagName,:前面表示topic的名称,后面表示tags名称

    注意:

            tags从命名来看像是一个复数,但发送消息时,目的地只能指定一个topic下的一个tag,不能指定多个。

    12.如何指定消费者从哪开始消费消息,或开始消费的位置?

            消费者默认开始消费的位置请参考:RocketMQ FAQ。 若想自定义消费者开始的消费位置,只需在消费者类添加一个RocketMQPushConsumerLifecycleListener接口的实现即可。 示例如下:

    1. import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    2. import org.apache.rocketmq.common.UtilAll;
    3. import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
    4. import org.apache.rocketmq.common.message.MessageExt;
    5. import org.apache.rocketmq.spring.annotation.MessageModel;
    6. import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
    7. import org.apache.rocketmq.spring.core.RocketMQListener;
    8. import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
    9. import org.springframework.stereotype.Service;
    10. /**
    11. * 通用消费者
    12. */
    13. @Service
    14. @RocketMQMessageListener(consumerGroup = "common-customer-group", topic = "common_topic", messageModel = MessageModel.CLUSTERING)
    15. public class CommonConsumerListener implements RocketMQListener , RocketMQPushConsumerLifecycleListener {
    16. @Override
    17. public void onMessage(MessageExt messageExt) {
    18. System.out.println("通用消费者-----------------"+messageExt.toString());
    19. //消费者处理时抛出异常时就会自动重试
    20. // throw new RuntimeException("消费者处理时抛出异常时就会自动重试");
    21. }
    22. //自定义消息 开始消费的位置
    23. @Override
    24. public void prepareStart(DefaultMQPushConsumer consumer) {
    25. // set consumer consume message from now
    26. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
    27. consumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis()));
    28. }
    29. }

    13.NameServer 如何保证最终一致性?

            NameServer 作为一个名称服务,需要提供服务注册、服务剔除、服务发现这些基本功能,但是 NameServer 节点之间并不通信,在某个时刻各个节点数据可能不一致的情况下,下面分别从路由注册、路由剔除以及路由发现三个角度进行介绍 NameServer 如何保证最终一致性。

    1)路由注册:Broker 节点在启动时轮训 NameServer 列表,与每个 NameServer 节点建立长连接,发起注册请求。 注册后每 30s 向 NameServer 发送心跳包

    2)路由剔除:正常情况下 Broker 退出后会被 Netty 通道监听器监听到,异常情况下,NameServer 有一个定时任务,每隔 10s 扫描一下 Broker 表,剔除心跳包更新时间超过 120s 的 Broker。

    3)路由发现:由于 NameServer 不会主动推送 Broker 信息,所以 RocketMQ 客户端提供了定时拉取 Topic 最新路由信息的机制(默认是30秒)。

    4)由于路由信息是定时拉取得,所以需要加上(生产者)重试机制

    14.RocketMQ 如何实现事务消息?

            分布式系统中的事务可以使用 TCC(Try、Confirm、Cancel)。2pc 来解决分布式系统中的消息原子性。RocketMQ中提供分布事务功能,通过 RocketMQ 事务消息能达到分布式事务的最终一致。

            Half Message:预处理消息,当 Broker 收到此类消息后,会存储到 RMQ_SYS_TRANS_HALF_TOPIC 的消息消费队列中。

            检查事务状态:Broker 会开启一个定时任务,消费 RMQ_SYS_TRANS_HALF_TOPIC 队列中的消息,每次执行任务会向消息发送者确认事务执行状态(提交、回滚、未知),如果是未知,Broker 会定时去回调在重新检查。

            超时:如果超过回查次数,默认回滚消息。

            事务消息实现也就是它并未真正进入 Topic 的队列中,而是用了临时队列来放所谓的half message,等提交事务后才会真正的将half message转移到 Topic 下的队列中。

    15.RocketMQ 如何实现负载均衡?

    RocketMQ 通过 Topic 在多 Broker 中分布式存储实现负载均衡,同时需要生产者、Broker 以及消费者多个不同角色共同完成。

    Producer
            发送端通过指定 queue 发送消息到相应的Broker 中来达到写入时的负载均衡。默认策略是随机选择,通过自增随机数对列表大小取余获取位置信息,自带容错策略。还可以通过 MessageQueueSelector 的 select 方法实现自定义。

    Consumer
            采用的是平均分配算法来进行负载均衡,支持一下几种负载均衡策略:

    • 平均分配策略,也是默认策略;
    • 环形分配策略;
    • 手动配置分配策略;
    • 机房分配策略;
    • 一致性哈希分配策略:
    • 靠近机房策略。
       

    16.RocketMQ 如何保证消息不丢失?

    RocketMQ 中间件的 Producer、Broker 以及 Consumer 三个组成部分都有可能导致消息的丢失。

    Producer 如何保证消息不丢失

    • 采取 send() 同步发送消息,发送结果是同步感知的。
    • 发送失败后可以重试,设置重试次数,默认 3 次。producer.setRetryTimesWhenSendFailed(10);
    • 集群部署,比如发送失败了的原因可能是当前 Broker 宕机了,重试的时候会发送到其它的 Broker 上。

    Broker 如何保证消息不丢失

    • 修改刷盘策略为同步刷盘,默认情况下是异步刷盘(即消息到内存后就返回确认信息)。
    • 集群部署,默认异步复制到 slave,可以采用同步的复制方式,master 节点将会同步等待 slave 节点复制完成,才会返回确认响应。

    Consumer 如何保证消息不丢失
            消费过程需要注意返回消息状态,只有当业务逻辑真正执行成功后,才能返回 CONSUME_SUCCESS 的 ACK 确认。

    17.高吞吐量下如何优化生产者和消费者?

    1)同一个 Group 下,多机部署,并行消费;

    2)单个 Consumer 提高消费线程个数;

    3)批量消费:批量拉取拉去消息以及业务逻辑的批量处理。

  • 相关阅读:
    flutter useRootNavigator属性的作用
    Linux学习-69-Linux系统启动管理
    【汇编语言】笔记
    网络流——EK算法求最大流
    plink如何更新表型数据
    conan包管理工具(1)
    vue-pdf(v4.3.0)
    flink状态后端和检查点的关系
    物联网低代码平台选型六大标准,赶快学起来
    VM系列振弦采集模块频率计算与质量评定
  • 原文地址:https://blog.csdn.net/weixin_43549578/article/details/126839860