• RocketMQ如何安全的批量发送消息❓


    优点:

    批量发送消息可以提高rocketmq的生产者性能和吞吐量

    使用场景:

    1. 发送大量小型消息时;
    2. 需要降低消息发送延迟时;
    3. 需要提高生产者性能时;

    注意事项:

    1. 消息列表的大小不能超过broker设置的最大消息大小;
    2. 消息列表的大小不能超过生产证设置的maxMessageSize 参数,此参数默认为 4MB;
    3. 批量发送消息不支持消息事务;
    4. 如果代码在发送消息列表时发生异常,则可能会发生部分消息发送成功,部分消息发送失败的情况。如果要确保所有消息都已成功发送,则需要增加错误处理逻辑和消息重试机制;


    批量发送消息为什么要限制maxMessageSize❓

    消息列表的大小不能超过生产者设置的maxMessageSize参数,主要是为了避免消息发送延迟和消息过大导致broker出现性能问题。如果尝试发送大于maxMessageSize的消息,RocketMQ会抛出MessageTooLargeException异常,并且消息不会被发送到broker。

    如果开发者在开发时遇到了消息列表大小超过maxMessageSize的情况,可以考虑以下几种处理方式:

      1. 提升maxMessageSize参数的大小,这样可以容纳更大的消息列表。但是,需要注意在提升参数大小时,要考虑到RocketMQ broker的性能和网络带宽等因素。
      2. 考虑将消息列表进行拆分,然后分批发送。这样可以避免一次发送过多的消息。
      3. 计算消息的大小并进行压缩。可以使用一些压缩算法,如 LZ4、Snappy 等,对消息进行压缩,以减小消息的大小。
      4. 对超过 maxMessageSize 的消息进行过滤或其他处理。可以通过业务逻辑对消息进行分组或分类,对超过 maxMessageSize 的消息进行过滤或其他处理,以避免发送超出限制的消息。

    代码实现

    1. package com.resource.sync.rocketmq;
    2. import java.util.Iterator;
    3. import java.util.List;
    4. /**
    5. * @description:消息分割,在rocketmq中,一次性发送消息的长度不可超过4mb,此时我们需要进行切割,确保消息长度小于4mb
    6. **/
    7. public class ListSplitter implements Iterator> {
    8. /**
    9. * 分割数据大小
    10. */
    11. private int sizeLimit;
    12. /**
    13. * 分割数据列表
    14. */
    15. private final List messages;
    16. /**
    17. * 分割索引
    18. */
    19. private int currIndex;
    20. public ListSplitter(int sizeLimit, List messages) {
    21. this.sizeLimit = sizeLimit;
    22. this.messages = messages;
    23. }
    24. @Override
    25. public boolean hasNext() {
    26. return currIndex < messages.size();
    27. }
    28. @Override
    29. public List next() {
    30. int nextIndex = currIndex;
    31. int totalSize = 0;
    32. for (; nextIndex < messages.size(); nextIndex++) {
    33. T t = messages.get(nextIndex);
    34. totalSize = totalSize + t.toString().length();
    35. if (totalSize > sizeLimit) {
    36. break;
    37. }
    38. }
    39. List subList = messages.subList(currIndex, nextIndex);
    40. currIndex = nextIndex;
    41. return subList;
    42. }
    43. }
    1. private final int maxMessageSize = 1024 * 1024 * 4;
    2. /**
    3. * 消息分割(批量发送)
    4. */
    5. private void bulkSendMsg(List> messageList) {
    6. // 限制数据大小
    7. ListSplitter splitter = new ListSplitter(maxMessageSize, messageList);
    8. while (splitter.hasNext()) {
    9. List nextList = splitter.next();
    10. syncBulkSendMessage("topic", nextList);
    11. }
    12. }
    13. /**
    14. * @param topic
    15. * @param list
    16. * @description:发送实时消息(批量)
    17. */
    18. public void syncBulkSendMessage(String topic, List list) {
    19. SendResult sendResult = null;
    20. try {
    21. sendResult = rocketMQTemplate.syncSend(topic, list);
    22. if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
    23. log.error("BULK_ROCKET_MQ_DISTRIBUTION_ERROR.RESULT_STATUS:{},MSG_ID:{}", sendResult.getSendStatus(), sendResult.getMsgId());
    24. }
    25. if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
    26. log.info("BULK_SEND_MSG_SUCCESS.MSG_ID:{}", sendResult.getMsgId());
    27. }
    28. } catch (Exception e) {
    29. log.error("BULK_ROCKET_MQ_DISTRIBUTION_ERROR:{}", e);
    30. }
    31. }

  • 相关阅读:
    代码随想录day56|583. 两个字符串的删除操作72. 编辑距离
    家庭用户无线上网案例(AC通过三层口对AP进行管理)
    vue基础知识六:v-show和v-if有什么区别?使用场景分别是什么?
    功能测试:核心原理、挑战以及解决之道
    密码学 - RSA签名算法
    C++ 语言学习 day02 (linux ) delete 函数 面对对象的类,构造函数,析构函数
    读像火箭科学家一样思考笔记05_思想实验
    【rust】8、连接数据库:sqlx
    华为机试 - 最长连续子序列
    顺序表基本操作-查找
  • 原文地址:https://blog.csdn.net/weixin_41860630/article/details/134257112