• RocketMq4 消息发送示例及源码浅阅


    example

    启动rockerMq:

    nohup ./mqnamesrv -n192.168.66.66:9876 >/dev/null 2>&1 & 
    nohup ./mqbroker -c ../conf/broker.conf >/dev/null 2>&1 & 
    
    #broker.conf 配置 
    brokerClusterName = DefaultCluster 
    brokerName = broker-a brokerIP1=192.168.XX.xx 
    namesrvAddr=192.168.xx.xx:9876 
    brokerId = 0 
    deleteWhen = 04 
    fileReservedTime = 48 
    brokerRole = ASYNC_MASTER 
    flushDiskType = ASYNC_FLUSH
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    启动控制台:
    消息发送:

    public class RocketMQProvider {
        public static void main(String[] args) throws Exception {
            //System.out.println(System.getProperty("user.home")); 
            // System.setProperty("rocketmq.client.log.loadconfig", "false"); 
            // 创建一个消息的生产者,且指定一个组 
            DefaultMQProducer producer = new DefaultMQProducer("groupA");
            //设置 namesrv 地址 
            // 可以从此地址获取 topic 的队列信息 
            producer.setNamesrvAddr("192.168.xx.xx:9876"); // producer.setSendMsgTimeout(6000); 
            // 设置 //producer.setDefaultTopicQueueNums(5); 
            // 思考:topic 下面有多个队列,应该向那个队列发送消息??? 
            // 开启 
            producer.start();
            //创建多条消息 
            for (int i = 0; i < 10; i++) {
                //创建消息对象
                Message message = new Message("topicA", "tagA", ("helloA" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                // 设置消息延时级别 
                // message.setDelayTimeLevel(6); 
                // 创建消息队列 
                // MessageQueue queue = new MessageQueue("jackhu-01","jackhu",0); 
                // 发送消息,默认负载策略 
                SendResult result = producer.send(message);
                //打印 
                System.out.println("发送消息返回结果:" + result);
            }
            //关闭 
            producer.shutdown();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    在这里插入图片描述

    发送流程

    获取路由信息

    • 获取路由信息
      在这里插入图片描述

    • 先从缓存中获取
      topicPublishInfoTable就是本地缓存数据。
      在这里插入图片描述

    • 获取namesvr路由信息
      在这里插入图片描述
      根据远程调用对象remotingclient调用namesvr,获取路由信息。Remoteing底层就是netty

    • 重试次数

    int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
    
    • 1

    重试次数:
    - 同步发送: 失败重试次数 == 设置次数(默认是2次) + 1
    - 异步发送: 默认就是1次

    选择其中一个队列发送消息(负载均衡)

    MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
    
    • 1

    选择其中一个队列发送消息:

    public MessageQueue selectOneMessageQueue() {
    // 随机递增取模算法
        int index = this.sendWhichQueue.getAndIncrement();
    // 值%队列长度
        int pos = Math.abs(index) % this.messageQueueList.size();
        if (pos < 0)
            pos = 0;
        return this.messageQueueList.get(pos);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    注意:默认算法,叫做随机递增取模算法

    选择负载均衡发送消息时候,容错开关:

    // 如果sendLatencyFaultEnable = true,在发送消息时候就会根据容错策略来进行相应的退避策略。
    private boolean sendLatencyFaultEnable = false;
    
    private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
    private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 发送消息核心代码
    sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
    
    • 1

    在这里插入图片描述

    负载均衡算法

    随机递增取模算法(默认算法)

    在这里插入图片描述

    随机算法

    随机算法:根据队列的长度,获取队列长度范围的随机数,根据获得结构,返回此角标位置的队列。
    在这里插入图片描述

    Hash算法

    在这里插入图片描述

    机房算法

    在这里插入图片描述

  • 相关阅读:
    RFID管理方案有效提升电力物资管理效率与资产安全
    蓝桥杯备赛Day8——队列
    【算法 | 位运算No.1】leetcode268. 丢失的数字
    bigdecimal保留两位小数
    2022年春季学期新疆电大一体化科学与技术第1次作业-1【标准答案】
    02_Alibaba微服务组件Nacos注册中心
    11.19 - 每日一题 - 408
    [附源码]java毕业设计本科毕业设计过程管理系统
    从四分钟到两秒——谈谈客户端性能优化的一些最佳实践
    来啦来啦|开源 * 安全 * 赋能 - .NET Conf China 2022
  • 原文地址:https://blog.csdn.net/Miaoshuowen/article/details/126376431