批量发送消息可以提高rocketmq的生产者性能和吞吐量。
消息列表的大小不能超过生产者设置的maxMessageSize参数,主要是为了避免消息发送延迟和消息过大导致broker出现性能问题。如果尝试发送大于maxMessageSize的消息,RocketMQ会抛出MessageTooLargeException异常,并且消息不会被发送到broker。
如果开发者在开发时遇到了消息列表大小超过maxMessageSize的情况,可以考虑以下几种处理方式:
- package com.resource.sync.rocketmq;
-
-
- import java.util.Iterator;
- import java.util.List;
-
- /**
- * @description:消息分割,在rocketmq中,一次性发送消息的长度不可超过4mb,此时我们需要进行切割,确保消息长度小于4mb
- **/
- public class ListSplitter
implements Iterator> {
- /**
- * 分割数据大小
- */
- private int sizeLimit;
-
- /**
- * 分割数据列表
- */
- private final List
messages; -
- /**
- * 分割索引
- */
- private int currIndex;
-
- public ListSplitter(int sizeLimit, List
messages) { - this.sizeLimit = sizeLimit;
- this.messages = messages;
- }
-
- @Override
- public boolean hasNext() {
- return currIndex < messages.size();
- }
-
- @Override
- public List
next() { - int nextIndex = currIndex;
- int totalSize = 0;
- for (; nextIndex < messages.size(); nextIndex++) {
- T t = messages.get(nextIndex);
- totalSize = totalSize + t.toString().length();
- if (totalSize > sizeLimit) {
- break;
- }
- }
- List
subList = messages.subList(currIndex, nextIndex); - currIndex = nextIndex;
- return subList;
- }
- }
- private final int maxMessageSize = 1024 * 1024 * 4;
-
- /**
- * 消息分割(批量发送)
- */
- private void bulkSendMsg(List
> messageList) { - // 限制数据大小
- ListSplitter splitter = new ListSplitter(maxMessageSize, messageList);
- while (splitter.hasNext()) {
- List
nextList = splitter.next(); - syncBulkSendMessage("topic", nextList);
- }
- }
-
-
- /**
- * @param topic
- * @param list
- * @description:发送实时消息(批量)
- */
- public void syncBulkSendMessage(String topic, List
list) { - SendResult sendResult = null;
- try {
- sendResult = rocketMQTemplate.syncSend(topic, list);
- if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
- log.error("BULK_ROCKET_MQ_DISTRIBUTION_ERROR.RESULT_STATUS:{},MSG_ID:{}", sendResult.getSendStatus(), sendResult.getMsgId());
- }
- if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
- log.info("BULK_SEND_MSG_SUCCESS.MSG_ID:{}", sendResult.getMsgId());
- }
-
- } catch (Exception e) {
- log.error("BULK_ROCKET_MQ_DISTRIBUTION_ERROR:{}", e);
- }
- }