• rocketmq发送消息底层分析


    企业中一般都会封装rocketmq 同步 异步 单向方法,你只需要配置好nameserver地址 topic tag 消息体等,然后调用封装方法进行发送即可。

    流程差不多如下

    1、导入mq依赖

            
                org.apache.rocketmq
                rocketmq-client
                4.8.0
            
    
    • 1
    • 2
    • 3
    • 4
    • 5

    2、消息发送者步骤
    创建消息生成者producer,指定组名
    指定Nameserver mq地址
    启动生产者
    创建消息对象,指定Topic、Tag和消息体
    发送消息
    关闭生产者

    3、消费者步骤
    创建消费者Consumer,指定组名
    指定Nameserver地址
    订阅主题Topic和Tag
    (listener监听消息,去消费先到的消息)
    设置回调函数,处理消息
    启动消费者

    同步 异步 单向底层区别

    //同步
                SendResult send = producer.send(msg);
                //异步
                producer.send(msg, new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        System.out.println("===" + sendResult);
                    }
    
                    @Override
                    public void onException(Throwable throwable) {
                        System.out.println("e:" + throwable);
                    }
                });
                //单向
                producer.sendOneway(msg);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    异步相对同步来说send方法多了些参数SendCallback sendCallback。而这个sendCallback重写的方法有成功和失败的方法。
    单向相对同步来说换了一个方法sendOneway.
    在跟踪源码去看到
    异步
    在这里插入图片描述
    同步
    在这里插入图片描述

    具体实现方法
    一下源码传参异步,单向的区别在
    int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
    以及switch里面
    switch(communicationMode) {
    case ASYNC:
    return null;
    case ONEWAY:
    return null;
    case SYNC:
    if (sendResult.getSendStatus() == SendStatus.SEND_OK || !this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
    return sendResult;
    }
    default:
    break label122;
    }

    具体如下
    sendDefaultImpl方法(核心:所有同步、异步、单向)

        private SendResult sendDefaultImpl(Message msg, CommunicationMode communicationMode, SendCallback sendCallback, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
            //校验 Producer 处于运行状态
            this.makeSureStateOK();
            //校验消息格式
            Validators.checkMessage(msg, this.defaultMQProducer);
            long invokeID = this.random.nextLong();
            long beginTimestampFirst = System.currentTimeMillis();
            long beginTimestampPrev = beginTimestampFirst;
            //获取Topic路由信息
            TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
            if (topicPublishInfo != null && topicPublishInfo.ok()) {
                boolean callTimeout = false;
                //发送的队列
                MessageQueue mq = null;
                //异常
                Exception exception = null;
                //最后一次发送结果
                SendResult sendResult = null;
                //计算调用发送消息到成功为止的最大次数:同步、单向调用一次 ;异步3次
                int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
                //第几次发送
                int times = 0;
                //存储每次发送消息现在broker
                String[] brokersSent = new String[timesTotal];
    
                while(true) {
                    label122: {
                        String info;
                        if (times < timesTotal) {
                            info = null == mq ? null : mq.getBrokerName();
                            //选择要发送的消息队列
                            MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, info);
                            if (mqSelected != null) {
                                mq = mqSelected;
                                brokersSent[times] = mqSelected.getBrokerName();
    
                                long endTimestamp;
                                try {
                                    beginTimestampPrev = System.currentTimeMillis();
                                    long costTime = beginTimestampPrev - beginTimestampFirst;
                                    if (timeout >= costTime) {
                                        //Kernel内核的意思,调用发送消息核心方法
                                        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                                        endTimestamp = System.currentTimeMillis();
                                        //更新Broker可用性信息,在选择发送到的消息队列时,会参考Broker发送消息的延迟。源码多次操作Broker,可见不懂Broker就无法真正理解mq源码。
                                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                                        switch(communicationMode) {
                                        case ASYNC:
                                            return null;
                                        case ONEWAY:
                                            return null;
                                        case SYNC:
                                            if (sendResult.getSendStatus() == SendStatus.SEND_OK || !this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                                return sendResult;
                                            }
                                        default:
                                            break label122;
                                        }
                                    }
    
                                    callTimeout = true;
                                } catch (RemotingException var26) {
                                    //当抛出RemotingException时,如果进行消息发送失败重试,则可能导致消息发送重复。例如,发送消息超时(RemotingTimeoutException),实际Broker接收到该消息并处理成功。因此,Consumer在消费时,需要保证幂等性。
                                   //这也是我们在生产环境看到可能出现多次重复发送的情况。
                                    //更新Broker可用信息,更新继续循环
                                    endTimestamp = System.currentTimeMillis();
                                    this.updateFaultItem(mqSelected.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                                    this.log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mqSelected), var26);
                                    this.log.warn(msg.toString());
                                    exception = var26;
                                    break label122;
                                } catch (MQClientException var27) {
                                   //更新Broker可用信息,继续循环
                                    endTimestamp = System.currentTimeMillis();
                                    this.updateFaultItem(mqSelected.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                                    this.log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mqSelected), var27);
                                    this.log.warn(msg.toString());
                                    exception = var27;
                                    break label122;
                                } catch (MQBrokerException var28) {
                                    //更新Broker可用信息,部分情况下异常返回,结束循环
                                    endTimestamp = System.currentTimeMillis();
                                    this.updateFaultItem(mqSelected.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                                    this.log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mqSelected), var28);
                                    this.log.warn(msg.toString());
                                    exception = var28;
                                    switch(var28.getResponseCode()) {
                                    case 1:
                                    case 14:
                                    case 16:
                                    case 17:
                                    case 204:
                                    case 205:
                                        break label122;
                                    //如果有发送结果,返回,没有,抛出异常
                                    default:
                                        if (sendResult != null) {
                                            return sendResult;
                                        }
    
                                        throw var28;
                                    }
                                } catch (InterruptedException var29) {
                                    endTimestamp = System.currentTimeMillis();
                                    this.updateFaultItem(mqSelected.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                                    this.log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mqSelected), var29);
                                    this.log.warn(msg.toString());
                                    this.log.warn("sendKernelImpl exception", var29);
                                    this.log.warn(msg.toString());
                                    throw var29;
                                }
                            }
                        }
    
                        if (sendResult != null) {
                            return sendResult;
                        }
    
                        info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s", times, System.currentTimeMillis() - beginTimestampFirst, msg.getTopic(), Arrays.toString(brokersSent));
                        info = info + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/");
                        MQClientException mqClientException = new MQClientException(info, (Throwable)exception);
                        if (callTimeout) {
                            throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
                        }
    
                        if (exception instanceof MQBrokerException) {
                            mqClientException.setResponseCode(((MQBrokerException)exception).getResponseCode());
                        } else if (exception instanceof RemotingConnectException) {
                            mqClientException.setResponseCode(10001);
                        } else if (exception instanceof RemotingTimeoutException) {
                            mqClientException.setResponseCode(10002);
                        } else if (exception instanceof MQClientException) {
                            mqClientException.setResponseCode(10003);
                        }
    
                        throw mqClientException;
                    }
    
                    ++times;
                }
            } else {
                List nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
                if (null != nsList && !nsList.isEmpty()) {
                    throw (new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/"), (Throwable)null)).setResponseCode(10005);
                } else {
                    throw (new MQClientException("No name server address, please set it." + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/"), (Throwable)null)).setResponseCode(10004);
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149

    但是异步是在该sendDefaultImpl方法之前做一个异步处理
    ExecutorService :那么mq异步用了线程池来提高异步请求的效率!!

        @Deprecated
        public void send(final Message msg, final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException, InterruptedException {
            final long beginStartTime = System.currentTimeMillis();
            ExecutorService executor = this.getCallbackExecutor();
    
            try {
                executor.submit(new Runnable() {
                    public void run() {
                        long costTime = System.currentTimeMillis() - beginStartTime;
                        if (timeout > costTime) {
                            try {
                                DefaultMQProducerImpl.this.sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout - costTime);
                            } catch (Exception var4) {
                                sendCallback.onException(var4);
                            }
                        } else {
                            sendCallback.onException(new RemotingTooMuchRequestException("DEFAULT ASYNC send call timeout"));
                        }
    
                    }
                });
            } catch (RejectedExecutionException var9) {
                throw new MQClientException("executor rejected ", var9);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    在这里插入图片描述
    NettyRemotingClient定义了多线程

    在这里插入图片描述
    并没有发现是哪种特殊的多线程,因此是通用的多线程。就好比map和hashmap,最后用了map一样。

    倒回来看,这里是多线程执行,大大提高了运行效率,并且代码严谨和规范,有判定条件,不同程度的try catch
    在这里插入图片描述

    线程池执行sendSelectImpl方法,传了msg,async异步,sendCallback(成功和失败的方法,发送消息不用等待响应,不管有没有成功或失败,生产者还会发送消息),timeout - costTime(最大等待时间默认3000-消耗时间)
    大概看一下入参的情况
    在这里插入图片描述
    这里timeTotal默认是1,异步的时候是3,影响后面循环次数。
    yi
    那么for循环里面要做什么事情呢
    选择消息要发送的队列,准备发送
    调用核心发送消息方法
    更新Broker可用性信息
    上面这个类sendDefaultImpl,有详细注释。
    在这里插入图片描述
    其实还可以继续深入下去,可以模仿rocketmq,自己写一个自定义mq框架那也是很厉害了!!!

  • 相关阅读:
    【Proteus仿真】【51单片机】电蒸锅温度控制系统
    react父子组件通信
    int *a, int **a, int a[], int *a[]的区别
    ffmpeg的基本功能介绍
    类别不均衡,离群点以及分布改变
    PostMan接口测试实用小点
    【Java】day01 - Java基础语法
    java 如何查看对象占内存大小
    后端研发工程师面经——数据库
    LeetCode刷题(8)
  • 原文地址:https://blog.csdn.net/weixin_43206161/article/details/126082330