• 03 RocketMQ - Broker 源码分析


    Broker 整体流程

    Broker 消息存储

    • CommitLog
      消息存储文件,所有的消息都存储在 CommitLog 中
      在这里插入图片描述
      CommitLog 以物理文件的方式存放,每台 Broker 上的 CommitLog 被本机器所有 ConsumeQueue 共享。每一个消息的存储长度是不固定的,前4个字节存储该条消息的总长度。在 CommitLog 中顺序写,随机读。CommitLog 文件默认大小为 1G,当文件满了,写入下一个文件,可通过在 broker 配置文件中设置 mapedFileSizeCommitLog 属性来改变默认大小。

    • ConsumeQueue
      ConsumeQueue 消息消费队列,可以看成是基于topic的commitlog索引文件,存储的是消息的物理存储的地址。每个Topic下的每个 Message Queue 都有一个对应的 ConsumeQueue 文件,引入的目的主要是提高消息消费的性能。
      在这里插入图片描述
      在这里插入图片描述
      ConsumeQueue中存储的是消息条目,为了加速 ConsumeQueue 消息条目的检索速度与节省磁盘空间,每一个 Consumequeue条目不会存储消息的全量信息,消息条目如下:

      ConsumeQueue(逻辑消费队列)作为消费消息的索引,保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。

    • IndexFile
      IndexFile(索引文件)提供了一种可以通过key或时间区间来查询消息的方法。Index文件的存储位置是:$HOME/store/index/{fileName},文件名fileName是以创建时的时间戳命名的,固定的单个IndexFile文件大小约为400M,一个IndexFile可以保存 2000W个索引,IndexFile的底层存储设计为在文件系统中实现HashMap结构,故RocketMQ的索引文件其底层实现为hash索引。

      在这里插入图片描述

    • Config
      config 文件夹中存储着 Topic 和 Consumer 等相关信息。主题和消费者群组相关的信息就存在此。
      consumerFilter.json :主题消息过滤信息
      consumerOffset.json :集群消费模式消息消进度
      delayOffset.json :延时消息队列拉取进度
      subscriptionGroup.json :消息消费组配置信息
      topics.json : topic 配置属性

      在这里插入图片描述

    Broker 启动源码


    BrokerStartup.class

    public static void main(String[] args) {
            start(createBrokerController(args));
        }
    
    • 1
    • 2
    • 3

    BrokerStartup::createBrokerController()

    1. 解析命令行参数
    2. 是否选择 dleger技术(raft)
    3. 创建 BrokerController
    4. 初始化 BrokerController
    public static BrokerController createBrokerController(String[] args) {
            System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
    
            if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) {
                NettySystemConfig.socketSndbufSize = 131072;
            }
    
            if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) {
                NettySystemConfig.socketRcvbufSize = 131072;
            }
    
            try {
                //PackageConflictDetect.detectFastjson();
                // 1.解析命令行参数
                Options options = ServerUtil.buildCommandlineOptions(new Options());
                commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),
                    new PosixParser());
                if (null == commandLine) {
                    System.exit(-1);
                }
    
                final BrokerConfig brokerConfig = new BrokerConfig();
                //netty服务器配置,与生产者通信
                final NettyServerConfig nettyServerConfig = new NettyServerConfig();
                //netty客户端配置,与NameSever通信
                final NettyClientConfig nettyClientConfig = new NettyClientConfig();
    
                nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,
                    String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
                nettyServerConfig.setListenPort(10911);
                final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
                //如果是从节点
                if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) {
                    int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10;
                    messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio);
                }
                //解析命令行中 -c 的参数
                if (commandLine.hasOption('c')) {
                    String file = commandLine.getOptionValue('c');
                    if (file != null) {
                        configFile = file;
                        InputStream in = new BufferedInputStream(new FileInputStream(file));
                        properties = new Properties();
                        properties.load(in);
    
                        properties2SystemEnv(properties);
                        MixAll.properties2Object(properties, brokerConfig);
                        MixAll.properties2Object(properties, nettyServerConfig);
                        MixAll.properties2Object(properties, nettyClientConfig);
                        MixAll.properties2Object(properties, messageStoreConfig);
    
                        BrokerPathConfigHelper.setBrokerConfigPath(file);
                        in.close();
                    }
                }
    
                MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);
    
                if (null == brokerConfig.getRocketmqHome()) {
                    System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV);
                    System.exit(-2);
                }
                //获取nameSever地址
                String namesrvAddr = brokerConfig.getNamesrvAddr();
                if (null != namesrvAddr) {
                    try {
                        String[] addrArray = namesrvAddr.split(";");
                        for (String addr : addrArray) {
                            RemotingUtil.string2SocketAddress(addr);
                        }
                    } catch (Exception e) {
                        System.out.printf(
                            "The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876;192.168.0.1:9876\"%n",
                            namesrvAddr);
                        System.exit(-3);
                    }
                }
                //主从设置
                switch (messageStoreConfig.getBrokerRole()) {
                    case ASYNC_MASTER:
                    case SYNC_MASTER:
                        brokerConfig.setBrokerId(MixAll.MASTER_ID);
                        break;
                    case SLAVE:
                        if (brokerConfig.getBrokerId() <= 0) {
                            System.out.printf("Slave's brokerId must be > 0");
                            System.exit(-3);
                        }
    
                        break;
                    default:
                        break;
                }
                // 2.是否选择 dleger技术 raft:类似于ZK选举,但是性能不好,BUG比较多,推荐不使用
                if (messageStoreConfig.isEnableDLegerCommitLog()) {
                    brokerConfig.setBrokerId(-1);
                }
    
                messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1);
                LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
                JoranConfigurator configurator = new JoranConfigurator();
                configurator.setContext(lc);
                lc.reset();
                configurator.doConfigure(brokerConfig.getRocketmqHome() + "/conf/logback_broker.xml");
    
                if (commandLine.hasOption('p')) {
                    InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
                    MixAll.printObjectProperties(console, brokerConfig);
                    MixAll.printObjectProperties(console, nettyServerConfig);
                    MixAll.printObjectProperties(console, nettyClientConfig);
                    MixAll.printObjectProperties(console, messageStoreConfig);
                    System.exit(0);
                } else if (commandLine.hasOption('m')) {
                    InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
                    MixAll.printObjectProperties(console, brokerConfig, true);
                    MixAll.printObjectProperties(console, nettyServerConfig, true);
                    MixAll.printObjectProperties(console, nettyClientConfig, true);
                    MixAll.printObjectProperties(console, messageStoreConfig, true);
                    System.exit(0);
                }
    
                log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
                MixAll.printObjectProperties(log, brokerConfig);
                MixAll.printObjectProperties(log, nettyServerConfig);
                MixAll.printObjectProperties(log, nettyClientConfig);
                MixAll.printObjectProperties(log, messageStoreConfig);
                // 3.创建 BrokerController
                final BrokerController controller = new BrokerController(
                    brokerConfig,
                    nettyServerConfig,
                    nettyClientConfig,
                    messageStoreConfig);
                // remember all configs to prevent discard
                controller.getConfiguration().registerConfig(properties);
                //4. 初始化 BrokerController
                boolean initResult = controller.initialize();
                if (!initResult) {
                    controller.shutdown();
                    System.exit(-3);
                }
                //jvm关闭的勾子函数
                Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
                    private volatile boolean hasShutdown = false;
                    private AtomicInteger shutdownTimes = new AtomicInteger(0);
    
                    @Override
                    public void run() {
                        synchronized (this) {
                            log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet());
                            if (!this.hasShutdown) {
                                this.hasShutdown = true;
                                long beginTime = System.currentTimeMillis();
                                controller.shutdown();
                                long consumingTimeTotal = System.currentTimeMillis() - beginTime;
                                log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal);
                            }
                        }
                    }
                }, "ShutdownHook"));
    
                return controller;
            } catch (Throwable e) {
                e.printStackTrace();
                System.exit(-1);
            }
    
            return null;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168

    BrokerController::initialize()
    1.加载 config 文件夹里面的相关信息
    2.创建消息存储管理组件
    3.存储组件启动(包含零拷贝MMAP技术)
    4.创建处理消息的线程池
    5.注册各种处理消息请求(功能号设计)
    6.开启持久化配置文件相关定时任务
    7.处理事务相关消息

    public boolean initialize() throws CloneNotSupportedException {
            //1.加载 config 信息
            //todo 加载Broker中的主题信息  json
            boolean result = this.topicConfigManager.load();
            //todo 加载消费进度
            result = result && this.consumerOffsetManager.load();
            //todo 加载订阅信息
            result = result && this.subscriptionGroupManager.load();
            //todo 加载订消费者过滤信息
            result = result && this.consumerFilterManager.load();
    
            if (result) {
                try {
                    //2.创建消息存储管理组件
                    this.messageStore =
                        new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
                            this.brokerConfig);
                    //如果开启了多副本机制,会为集群节点选主器添加roleChangeHandler事件处理器,即节点发送变更后的事件处理器。
                    if (messageStoreConfig.isEnableDLegerCommitLog()) {
                        DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);
                        ((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);
                    }
                    //broker的统计组件
                    this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
                    //load plugin
                    MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);
                    this.messageStore = MessageStoreFactory.build(context, this.messageStore);
                    this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
                } catch (IOException e) {
                    result = false;
                    log.error("Failed to initialize", e);
                }
            }
            //3.存储组件启动(包含零拷贝MMAP技术)
    
            result = result && this.messageStore.load();
            if (result) {
                //构建netty服务端(监听10911)
                this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
                NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
                //在RocketMQ 4.5.1之后,10909端口vipChannelEnabled默认为了false)
                fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
                this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
                //4.创建处理消息的线程池
                //发送消息的线程池
                this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
                    this.brokerConfig.getSendMessageThreadPoolNums(),
                    this.brokerConfig.getSendMessageThreadPoolNums(),
                    1000 * 60,
                    TimeUnit.MILLISECONDS,
                    this.sendThreadPoolQueue,
                    new ThreadFactoryImpl("SendMessageThread_"));
                //pull消息的线程池
                this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
                    this.brokerConfig.getPullMessageThreadPoolNums(),
                    this.brokerConfig.getPullMessageThreadPoolNums(),
                    1000 * 60,
                    TimeUnit.MILLISECONDS,
                    this.pullThreadPoolQueue,
                    new ThreadFactoryImpl("PullMessageThread_"));
                //应答线程池
                this.replyMessageExecutor = new BrokerFixedThreadPoolExecutor(
                    this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
                    this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
                    1000 * 60,
                    TimeUnit.MILLISECONDS,
                    this.replyThreadPoolQueue,
                    new ThreadFactoryImpl("ProcessReplyMessageThread_"));
                //查询线程池
                this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(
                    this.brokerConfig.getQueryMessageThreadPoolNums(),
                    this.brokerConfig.getQueryMessageThreadPoolNums(),
                    1000 * 60,
                    TimeUnit.MILLISECONDS,
                    this.queryThreadPoolQueue,
                    new ThreadFactoryImpl("QueryMessageThread_"));
    
                this.adminBrokerExecutor =
                    Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl(
                        "AdminBrokerThread_"));
    
                this.clientManageExecutor = new ThreadPoolExecutor(
                    this.brokerConfig.getClientManageThreadPoolNums(),
                    this.brokerConfig.getClientManageThreadPoolNums(),
                    1000 * 60,
                    TimeUnit.MILLISECONDS,
                    this.clientManagerThreadPoolQueue,
                    new ThreadFactoryImpl("ClientManageThread_"));
                //心跳处理的线程池
                this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor(
                    this.brokerConfig.getHeartbeatThreadPoolNums(),
                    this.brokerConfig.getHeartbeatThreadPoolNums(),
                    1000 * 60,
                    TimeUnit.MILLISECONDS,
                    this.heartbeatThreadPoolQueue,
                    new ThreadFactoryImpl("HeartbeatThread_", true));
    
                this.endTransactionExecutor = new BrokerFixedThreadPoolExecutor(
                    this.brokerConfig.getEndTransactionThreadPoolNums(),
                    this.brokerConfig.getEndTransactionThreadPoolNums(),
                    1000 * 60,
                    TimeUnit.MILLISECONDS,
                    this.endTransactionThreadPoolQueue,
                    new ThreadFactoryImpl("EndTransactionThread_"));
    
                this.consumerManageExecutor =
                    Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl(
                        "ConsumerManageThread_"));
                //5.注册各种处理消息请求(功能号设计)
                this.registerProcessor();
                //6.开启持久化配置文件相关定时任务
                final long initialDelay = UtilAll.computeNextMorningTimeMillis() - System.currentTimeMillis();
                final long period = 1000 * 60 * 60 * 24;
                //检查broker的状态
                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            BrokerController.this.getBrokerStats().record();
                        } catch (Throwable e) {
                            log.error("schedule record error.", e);
                        }
                    }
                }, initialDelay, period, TimeUnit.MILLISECONDS);
                //consumerOffset
                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            BrokerController.this.consumerOffsetManager.persist();
                        } catch (Throwable e) {
                            log.error("schedule persist consumerOffset error.", e);
                        }
                    }
                }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
    
                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            BrokerController.this.consumerFilterManager.persist();
                        } catch (Throwable e) {
                            log.error("schedule persist consumer filter error.", e);
                        }
                    }
                }, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);
    
                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            BrokerController.this.protectBroker();
                        } catch (Throwable e) {
                            log.error("protectBroker error.", e);
                        }
                    }
                }, 3, 3, TimeUnit.MINUTES);
    
                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            BrokerController.this.printWaterMark();
                        } catch (Throwable e) {
                            log.error("printWaterMark error.", e);
                        }
                    }
                }, 10, 1, TimeUnit.SECONDS);
    
                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
                    @Override
                    public void run() {
                        try {
                            log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());
                        } catch (Throwable e) {
                            log.error("schedule dispatchBehindBytes error.", e);
                        }
                    }
                }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
    
                if (this.brokerConfig.getNamesrvAddr() != null) {
                    this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());
                    log.info("Set user specified name server address: {}", this.brokerConfig.getNamesrvAddr());
                } else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {
                    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
                        @Override
                        public void run() {
                            try {
                                BrokerController.this.brokerOuterAPI.fetchNameServerAddr();
                            } catch (Throwable e) {
                                log.error("ScheduledTask fetchNameServerAddr exception", e);
                            }
                        }
                    }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
                }
    
                if (!messageStoreConfig.isEnableDLegerCommitLog()) {
                    if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
                        if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
                            this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
                            this.updateMasterHAServerAddrPeriodically = false;
                        } else {
                            this.updateMasterHAServerAddrPeriodically = true;
                        }
                    } else {
                        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                            @Override
                            public void run() {
                                try {
                                    BrokerController.this.printMasterAndSlaveDiff();
                                } catch (Throwable e) {
                                    log.error("schedule printMasterAndSlaveDiff error.", e);
                                }
                            }
                        }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
                    }
                }
    
                if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
                    // Register a listener to reload SslContext
                    try {
                        fileWatchService = new FileWatchService(
                            new String[] {
                                TlsSystemConfig.tlsServerCertPath,
                                TlsSystemConfig.tlsServerKeyPath,
                                TlsSystemConfig.tlsServerTrustCertPath
                            },
                            new FileWatchService.Listener() {
                                boolean certChanged, keyChanged = false;
    
                                @Override
                                public void onChanged(String path) {
                                    if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
                                        log.info("The trust certificate changed, reload the ssl context");
                                        reloadServerSslContext();
                                    }
                                    if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
                                        certChanged = true;
                                    }
                                    if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
                                        keyChanged = true;
                                    }
                                    if (certChanged && keyChanged) {
                                        log.info("The certificate and private key changed, reload the ssl context");
                                        certChanged = keyChanged = false;
                                        reloadServerSslContext();
                                    }
                                }
    
                                private void reloadServerSslContext() {
                                    ((NettyRemotingServer) remotingServer).loadSslContext();
                                    ((NettyRemotingServer) fastRemotingServer).loadSslContext();
                                }
                            });
                    } catch (Exception e) {
                        log.warn("FileWatchService created error, can't load the certificate dynamically");
                    }
                }
                //7.处理事务相关消息
                initialTransaction();
                initialAcl();
                initialRpcHooks();
            }
            return result;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215
    • 216
    • 217
    • 218
    • 219
    • 220
    • 221
    • 222
    • 223
    • 224
    • 225
    • 226
    • 227
    • 228
    • 229
    • 230
    • 231
    • 232
    • 233
    • 234
    • 235
    • 236
    • 237
    • 238
    • 239
    • 240
    • 241
    • 242
    • 243
    • 244
    • 245
    • 246
    • 247
    • 248
    • 249
    • 250
    • 251
    • 252
    • 253
    • 254
    • 255
    • 256
    • 257
    • 258
    • 259
    • 260
    • 261
    • 262
    • 263
    • 264
    • 265
    • 266
    • 267

    BrokerStartup::start()

    public static BrokerController start(BrokerController controller) {
            try {
                //启动
                controller.start();
    
                String tip = "The broker[" + controller.getBrokerConfig().getBrokerName() + ", "
                    + controller.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
    
                if (null != controller.getBrokerConfig().getNamesrvAddr()) {
                    tip += " and name server is " + controller.getBrokerConfig().getNamesrvAddr();
                }
    
                log.info(tip);
                System.out.printf("%s%n", tip);
                return controller;
            } catch (Throwable e) {
                e.printStackTrace();
                System.exit(-1);
            }
    
            return null;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    BrokerController::start()

    public void start() throws Exception {
            //启动消息存储组件
            if (this.messageStore != null) {
                this.messageStore.start();
            }
            //启动netty服务器
            if (this.remotingServer != null) {
                this.remotingServer.start();
            }
    
            if (this.fastRemotingServer != null) {
                this.fastRemotingServer.start();
            }
    
            if (this.fileWatchService != null) {
                this.fileWatchService.start();
            }
            //todo 对外通信组件,例如给namesever发心跳
            if (this.brokerOuterAPI != null) {
                this.brokerOuterAPI.start();
            }
    
            if (this.pullRequestHoldService != null) {
                this.pullRequestHoldService.start();
            }
    
            if (this.clientHousekeepingService != null) {
                this.clientHousekeepingService.start();
            }
    
            if (this.filterServerManager != null) {
                this.filterServerManager.start();
            }
            //开启DLedger多副本主从切换
            if (!messageStoreConfig.isEnableDLegerCommitLog()) {
                //开启事务状态回查处理器
                startProcessorByHa(messageStoreConfig.getBrokerRole());
                //处理从节点的元数据同步
                handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
                this.registerBrokerAll(true, false, true);
            }
            //todo broker每隔30s向NameServer发送心跳包
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
                @Override
                public void run() {
                    try {
                        //todo 向NameServer发送心跳包
                        BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
                    } catch (Throwable e) {
                        log.error("registerBrokerAll Exception", e);
                    }
                }
            }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
    
            if (this.brokerStatsManager != null) {
                this.brokerStatsManager.start();
            }
    
            if (this.brokerFastFailure != null) {
                this.brokerFastFailure.start();
            }
    
    
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65

    Broker 消息写入源码

    存储设计层次:
    image.png

    • 业务层:也可以称之为网络层,就是收到消息之后,一般交给 SendMessageProcessor 来分配(交给哪个业务来处理)。DefaultMessageStore,这个是存储层最核心的入口。
    • 存储逻辑层:主要负责各种存储的逻辑,里面有很多跟存储同名的类。
    • 存储I/O层:主要负责存储的具体的消息与I/O处理。


    普通消息写入流程:

    CommitLog 写入

    RocketMQ 使用 Netty 处理网络,broker 收到消息写入的请求就会进入 SendMessageProcessor类 中processRequest 方法。

    最终进入DefaultMessageStore类中asyncPutMessage方法进行消息的存储

    然后消息进入commitlog类中的asyncPutMessage方法进行消息的存储

    ConsumeQueue 与 IndexFile 写入

    消息在写入 CommitLog 后,会有一个定时任务每毫秒去推送消息将新加入的消息写入 ConsumeQueue 与 IndexFile

    在 broker 启动时,会开启这个线程去处理写入任务 ,具体类 ReputMessageService
    在这里插入图片描述
    后续看 ReputMessageService::run() 即可

    public void run() {
                DefaultMessageStore.log.info(this.getServiceName() + " service started");
    
                while (!this.isStopped()) {
                    try {
                        Thread.sleep(1);
                        this.doReput();
                    } catch (Exception e) {
                        DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
                    }
                }
    
                DefaultMessageStore.log.info(this.getServiceName() + " service end");
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    核心方法 doReput()

    private void doReput() {
                if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
                    log.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",
                        this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());
                    this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
                }
                //循环读取每一条消息
                for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
    
                    if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()
                        && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
                        break;
                    }
                    //读取结果
                    SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
                    if (result != null) {
                        try {
                            this.reputFromOffset = result.getStartOffset();
    
                            for (int readSize = 0; readSize < result.getSize() && doNext; ) {
                                //创建DispatchRequest对象
                                DispatchRequest dispatchRequest =
                                    DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
                                int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();
    
                                if (dispatchRequest.isSuccess()) {
                                    if (size > 0) {
                                        //如果消息长度大于0,则调用doDispatch方法
                                        //todo  跳转到 CommitLogDispatcherBuildConsumeQueue.dispatch()方法
                                        DefaultMessageStore.this.doDispatch(dispatchRequest);
    
                                        if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
                                            && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
                                            DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
                                                dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
                                                dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
                                                dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
                                        }
    
                                        this.reputFromOffset += size;
                                        readSize += size;
                                        if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
                                            DefaultMessageStore.this.storeStatsService
                                                .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet();
                                            DefaultMessageStore.this.storeStatsService
                                                .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
                                                .addAndGet(dispatchRequest.getMsgSize());
                                        }
                                    } else if (size == 0) {
                                        this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
                                        readSize = result.getSize();
                                    }
                                } else if (!dispatchRequest.isSuccess()) {
    
                                    if (size > 0) {
                                        log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);
                                        this.reputFromOffset += size;
                                    } else {
                                        doNext = false;
                                        // If user open the dledger pattern or the broker is master node,
                                        // it will not ignore the exception and fix the reputFromOffset variable
                                        if (DefaultMessageStore.this.getMessageStoreConfig().isEnableDLegerCommitLog() ||
                                            DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {
                                            log.error("[BUG]dispatch message to consume queue error, COMMITLOG OFFSET: {}",
                                                this.reputFromOffset);
                                            this.reputFromOffset += result.getSize() - readSize;
                                        }
                                    }
                                }
                            }
                        } finally {
                            result.release();
                        }
                    } else {
                        doNext = false;
                    }
                }
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78

    写入处理类 CommitLogDispatcher
    在这里插入图片描述

    Broker 核心设计

    NRS 与 NRC 的功能号设计

    客户端发送消息时定义一个code,对应一个功能,服务端注册一个业务处理,对应一个code的业务处理。code对应码表RequestCode类。

    客户端:
    RocketMQ 的通讯使用的是 Netty,作为客户端核心类有两种:RemotingCommand与NettyRemotingClient。

    • RemotingCommand 主要处理消息的组装:包括消息头、消息序列化与反序列化。
    • NettyRemotingClient 主要处理消息的发送:包括同步、异步、单向、注册等操作。

    发送消息传入 code 值源码
    在这里插入图片描述


    拉取消息传入 code 值源码
    在这里插入图片描述

    服务端:
    Broker 启动时将服务端需要处理的 ExecutorService 存放到 processorTable 中

    protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable =
            new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64);
    
    • 1
    • 2

    在启动流程中 BrokerController 类中的 initialize() 中
    image.png
    image.png

    Commitlog 写入时锁的选择

    RocketMQ 在写入消息到 CommitLog 中时,使用了锁机制,即同一时刻只有一个线程可以写CommitLog文件。CommitLog 中使用了两种锁,一个是自旋锁,另一个是重入锁

    在这里插入图片描述
    根据配置可选择使用自旋锁或重入锁,在CommitLog 的构造方法中可看到

    //useReentrantLockWhenPutMessage参数默认是false,使用自旋锁。异步刷盘建议使用自旋锁,同步刷盘建议使用重入锁
            this.putMessageLock = defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ?
                    new PutMessageReentrantLock() : new PutMessageSpinLock();
    
    //重入锁
    public class PutMessageReentrantLock implements PutMessageLock {
        private ReentrantLock putMessageNormalLock = new ReentrantLock(); // NonfairSync
    
        @Override
        public void lock() {
            putMessageNormalLock.lock();
        }
    
        @Override
        public void unlock() {
            putMessageNormalLock.unlock();
        }
    }
    
    //自旋锁
    public class PutMessageSpinLock implements PutMessageLock {
        //true: Can lock, false : in lock.
        private AtomicBoolean putMessageSpinLock = new AtomicBoolean(true);
    
        @Override
        public void lock() {
            boolean flag;
            do {
                flag = this.putMessageSpinLock.compareAndSet(true, false);
            }
            while (!flag);
        }
    
        @Override
        public void unlock() {
            this.putMessageSpinLock.compareAndSet(false, true);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    RocketMQ 官方文档优化建议:异步刷盘建议使用自旋锁,同步刷盘建议使用重入锁

    • 同步刷盘时,锁竞争激烈,会有较多的线程处于等待阻塞等待锁的状态,如果采用自旋锁会浪费很多的CPU时间,所以“同步刷盘建议使用重入锁”。

    • 异步刷盘时,间隔一定的时间刷一次盘,锁竞争不激烈,不会存在大量阻塞等待锁的线程,偶尔锁等待就自旋等待一下很短的时间,不要进行上下文切换了,所以采用自旋锁更合适。

    MMAP

    10分钟了解什么是内存映射MMAP!
    mmap ---- 内存映射原理

    RocketMQ底层对commitLog、consumeQueue之类的磁盘文件的读写操作都采用了mmap技术。具体到代码里面就是利用JDK里面NIO的MapperByteBuffer的map()函数,来先将磁盘文件(CommitLog文件、consumeQueue文件)映射到内存里来。

    实现这样的映射关系后,进程就可以采用指针的方式读写操作这一段内存,而系统会自动回写脏页面到对应的文件磁盘上,即完成了对文件的操作而不必再调用 read、write 等系统调用函数。相反,内核空间对这段区域的修改也直接反映用户空间,从而可以实现不同进程间的文件共享。

    相关源码:
    MappedFile::init()
    将磁盘文件映射到内存

    private void init(final String fileName, final int fileSize) throws IOException {
            this.fileName = fileName;
            this.fileSize = fileSize;
            this.file = new File(fileName);
            this.fileFromOffset = Long.parseLong(this.file.getName());
            boolean ok = false;
    
            ensureDirOK(this.file.getParent());
    
            try {
                //文件通道 fileChannel
                this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
                //FileChannel配合着ByteBuffer,将读写的数据缓存到内存中(操纵大文件时可以显著提高效率)
                //MappedByteBuffer (零拷贝之内存映射:mmap)
                //FileChannel 定义了一个 map() 方法,它可以把一个文件从 position 位置开始的 size 大小的区域映射为内存映像文件
                this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
    
                //原子操作类 ---CAS的原子操作类--多线程效率(加锁)
                TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
                TOTAL_MAPPED_FILES.incrementAndGet();
                ok = true;
            } catch (FileNotFoundException e) {
                log.error("Failed to create file " + this.fileName, e);
                throw e;
            } catch (IOException e) {
                log.error("Failed to map file " + this.fileName, e);
                throw e;
            } finally {
                if (!ok && this.fileChannel != null) {
                    this.fileChannel.close();
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    MappedFile::appendMessagesInner()
    消息写入

    public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {
            assert messageExt != null;
            assert cb != null;
            //当前这个MaapedFile的写入位置
            int currentPos = this.wrotePosition.get();
    
            if (currentPos < this.fileSize) {
                //异步输盘时还有两种刷盘模式可以选择
                //TODO 如果writeBuffer!= null开启了堆外内存缓冲,使用writeBuffer,否则使用mappedByteBuffer(也是继承的ByteBuffer)
                //slice方法创建一个新的字节缓冲区
                ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
                byteBuffer.position(currentPos);//指定ByteBuffer中的position
                AppendMessageResult result;
                if (messageExt instanceof MessageExtBrokerInner) {
                    //todo 非批量处理
                    //写入具体的数据 commitlog中的数据格式
                    result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
                } else if (messageExt instanceof MessageExtBatch) {
                    result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
                } else {
                    return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
                }
                this.wrotePosition.addAndGet(result.getWroteBytes());
                this.storeTimestamp = result.getStoreTimestamp();
                return result;
            }
            log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
            return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    堆外内存

    JVM——堆外内存详解

    开启堆外内存需要修改配置文件broker:transientStorePoolEnable=true,且必须为异步刷盘和主节点
    DefaultMessageStore::DefaultMessageStore()





    堆外缓冲区流程

    RocketMQ单独创建一个ByteBuffer内存缓存池,用来临时存储数据,数据先写入该内存映射中,然后由commit线程定时将数据从该内存复制到与目标物理文件对应的内存映射中。RocketMQ引入该机制主要的原因是提供一种内存锁定,将当前堆外内存一直锁定在内存中,避免被进程将内存交换到磁盘。同时因为是堆外内存,这么设计可以避免频繁的GC。

    本质上分为两个阶段:
    一个阶段先写入堆外内存,另外一个阶段通过定时任务再写入文件。

    从图中可以发现,默认方式,mmap+PageCache的方式,读写消息都走的是pageCache(MappedByteBuffer类),这样读写都在pagecache里面不可避免会有锁的问题,在并发的读写操作情况下,会出现缺页中断降低,内存加锁,污染页的回写(脏页面)。

    而如果采用堆外缓冲区,DirectByteBuffer(堆外内存)+PageCache的两层架构方式,这样子可以实现读写消息分离,写入消息时候写到的是DirectByteBuffer——堆外内存中,读消息走的是PageCache(MappedByteBuffer类),带来的好处就是,避免了内存操作的很多容易堵的地方,降低了时延,比如说缺页中断降低,内存加锁,污染页的回写。

    所以使用堆外缓冲区的方式相对来说会比较好,但是肯定的是,需要消耗一定的内存,如果服务器内存吃紧就不推荐这种模式,同时的话,堆外缓冲区的话也需要配合异步刷盘才能使用(因为写数据分成了两步,同步刷盘延迟就会比较大)。

  • 相关阅读:
    MySQL存储过程
    互联网医院App开发:构建医疗服务的技术指南
    第五十七章 CSP的常见问题 - 如何调试CSP页面?
    Go语言VSCode开发环境配置
    行为引擎-暴力破解
    人工智能知识全面讲解:RNN的实现方式
    遥感IDL二次开发(辐射定标)
    【存储】cosbench对象存储测试工具
    字符串函数的详解
    Java测试题(核心基础)
  • 原文地址:https://blog.csdn.net/qq_33512765/article/details/126659101