• RocketMQ批量发送消息是负载均衡的吗❓


    问题:RocketMQ批量发送消息是负载均衡的吗?

    答:在rocketmq 4.0版本中不是负载均衡的,而是随机挑选的一个队列。

    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); 其中index是一个随机整数.

    源码中先有函数batch,将Collection msgs转换为MessageBatch:

    1. public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
    2. // 判断是否开启发送延迟故障容错机制
    3. if (this.sendLatencyFaultEnable) {
    4. try {
    5. // 获取一个随机整数作为index
    6. int index = tpInfo.getSendWhichQueue().getAndIncrement();
    7. // 遍历tpInfo.getMessageQueueList(),选择一个MessageQueue
    8. for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
    9. int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
    10. if (pos < 0)
    11. pos = 0;
    12. MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
    13. // 判断mq所属的BrokerName是否可用
    14. if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
    15. // 如果lastBrokerName为空或者与mq所属的BrokerName相等,则返回该mq
    16. if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
    17. return mq;
    18. }
    19. }
    20. // 获取一个不是最佳Broker的BrokerName
    21. final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
    22. // 根据notBestBroker获取对应的writeQueueNums
    23. int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
    24. if (writeQueueNums > 0) {
    25. // 选择一个MessageQueue
    26. final MessageQueue mq = tpInfo.selectOneMessageQueue();
    27. if (notBestBroker != null) {
    28. // 设置mq的BrokerName为notBestBroker
    29. mq.setBrokerName(notBestBroker);
    30. // 设置mq的QueueId为tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums
    31. mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
    32. }
    33. return mq;
    34. } else {
    35. // 移除notBestBroker
    36. latencyFaultTolerance.remove(notBestBroker);
    37. }
    38. } catch (Exception e) {
    39. // 记录错误日志
    40. log.error("Error occurred when selecting message queue", e);
    41. }
    42. // 返回选择的MessageQueue
    43. return tpInfo.selectOneMessageQueue();
    44. }
    45. // 返回选择的MessageQueue
    46. return tpInfo.selectOneMessageQueue(lastBrokerName);
    47. }

    而后有函数selectOneMessageQueue,为MessageBatch选取一个队列:

    1. public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
    2. if (this.sendLatencyFaultEnable) {
    3. try {
    4. int index = tpInfo.getSendWhichQueue().getAndIncrement();// index为随机整数
    5. for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
    6. int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
    7. if (pos < 0)
    8. pos = 0;
    9. MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
    10. if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
    11. if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
    12. return mq;
    13. }
    14. }
    15. final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
    16. int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
    17. if (writeQueueNums > 0) {
    18. final MessageQueue mq = tpInfo.selectOneMessageQueue();
    19. if (notBestBroker != null) {
    20. mq.setBrokerName(notBestBroker);
    21. mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
    22. }
    23. return mq;
    24. } else {
    25. latencyFaultTolerance.remove(notBestBroker);
    26. }
    27. } catch (Exception e) {
    28. log.error("Error occurred when selecting message queue", e);
    29. }
    30. return tpInfo.selectOneMessageQueue();
    31. }
    32. return tpInfo.selectOneMessageQueue(lastBrokerName);
    33. }

    最后调用函数sendKernelImpl发送消息:

    1. private SendResult sendKernelImpl(final Message msg,
    2. final MessageQueue mq,
    3. final CommunicationMode communicationMode,
    4. final SendCallback sendCallback,
    5. final TopicPublishInfo topicPublishInfo,
    6. final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    7. ...
    8. }

  • 相关阅读:
    过滤表filter达式cql相互转化
    怎么学编程效率高,编程练习网站编程软件下载,中文编程开发语言工具下载
    python的opencv操作记录(八)——小波变换
    计算机毕业设计成品基于Uniapp+SSM实现的新闻APP
    EF Core修改Migration更新数据库表
    TCP/IP(二十一)TCP 实战抓包分析(五)TCP 第三次握手 ACK 丢包
    在线客服系统统计员工的一些工作量,有哪些统计维度?
    行业追踪,2023-10-20
    【Graph Net学习】DeepWalk/Node2Vec实现Graph Embedding
    Nested嵌套对象类型还挺实用
  • 原文地址:https://blog.csdn.net/weixin_41860630/article/details/134257077