• # 消息中间件 RocketMQ 高级功能和源码分析(五)


    消息中间件 RocketMQ 高级功能和源码分析(五)

    一、 消息中间件 RocketMQ 源码分析:NameServer 路由元数据

    1、消息中间件 RocketMQ 中,NameServer 路由管理

    NameServer 的主要作用是为消息的生产者和消息消费者提供关于主题 Topic 的路由信息,那么 NameServer 需要存储路由的基础信息,还要管理 Broker 节点,包括路由注册、路由删除等。

    2、消息中间件 RocketMQ 中,NameServer 路由元信息

    1) 代码:RouteInfoManager

    
    private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
    private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
    private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
    private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
    private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
    

    2)消息中间件 RocketMQ 中,NameServer 路由实体图:

    在这里插入图片描述

    3)说明:

    topicQueueTable: Topic 消息队列路由信息,消息发送时根据路由表进行负载均衡

    brokerAddrTable: Broker 基础信息,包括brokerName、所属集群名称、主备 Broker 地址

    clusterAddrTable: Broker 集群信息,存储集群中所有 Broker 名称

    brokerLiveTable: Broker 状态信息,NameServer 每次收到心跳包是会替换该信息

    filterServerTable: Broker 上的 FilterServer 列表,用于类模式消息过滤。

    RocketMQ 基于定于发布机制,一个 Topic 拥有多个消息队列,一个 Broker 为每一个主题创建4个读队列和4个写队列。多个 Broker 组成一个集群,集群由相同的多台 Broker 组成 Master-Slave 架构,brokerId为0 代表 Master,大于0为 Slave。BrokerLiveInfo 中的 lastUpdateTimestamp 存储上次收到 Broker 心跳包的时间。

    4)实体数据实例图

    在这里插入图片描述

    在这里插入图片描述

    3、org.apache.rocketmq.broker.BrokerStartup.java 源码:

    /*
    D:\RocketMQ\rocketmq-master\broker\src\main\java\org\apache\rocketmq\broker\BrokerStartup.java
    
     * Licensed to the Apache Software Foundation (ASF) under one or more
     * contributor license agreements.  See the NOTICE file distributed with
     * this work for additional information regarding copyright ownership.
     * The ASF licenses this file to You under the Apache License, Version 2.0
     * (the "License"); you may not use this file except in compliance with
     * the License.  You may obtain a copy of the License at
     *
     *     http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    package org.apache.rocketmq.broker;
    
    import ch.qos.logback.classic.LoggerContext;
    import ch.qos.logback.classic.joran.JoranConfigurator;
    import org.apache.commons.cli.CommandLine;
    import org.apache.commons.cli.Option;
    import org.apache.commons.cli.Options;
    import org.apache.commons.cli.PosixParser;
    import org.apache.rocketmq.common.BrokerConfig;
    import org.apache.rocketmq.common.MQVersion;
    import org.apache.rocketmq.common.MixAll;
    import org.apache.rocketmq.common.constant.LoggerName;
    import org.apache.rocketmq.logging.InternalLogger;
    import org.apache.rocketmq.logging.InternalLoggerFactory;
    import org.apache.rocketmq.remoting.common.RemotingUtil;
    import org.apache.rocketmq.remoting.common.TlsMode;
    import org.apache.rocketmq.remoting.netty.NettyClientConfig;
    import org.apache.rocketmq.remoting.netty.NettyServerConfig;
    import org.apache.rocketmq.remoting.netty.NettySystemConfig;
    import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
    import org.apache.rocketmq.remoting.protocol.RemotingCommand;
    import org.apache.rocketmq.srvutil.ServerUtil;
    import org.apache.rocketmq.store.config.BrokerRole;
    import org.apache.rocketmq.store.config.MessageStoreConfig;
    import org.slf4j.LoggerFactory;
    
    import java.io.BufferedInputStream;
    import java.io.FileInputStream;
    import java.io.InputStream;
    import java.util.Properties;
    import java.util.concurrent.atomic.AtomicInteger;
    
    import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.TLS_ENABLE;
    
    public class BrokerStartup {
        public static Properties properties = null;
        public static CommandLine commandLine = null;
        public static String configFile = null;
        public static InternalLogger log;
    
        public static void main(String[] args) {
            start(createBrokerController(args));
        }
    
        public static BrokerController start(BrokerController controller) {
            try {
    
                controller.start();
    
                String tip = "The broker[" + controller.getBrokerConfig().getBrokerName() + ", "
                    + controller.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
    
                if (null != controller.getBrokerConfig().getNamesrvAddr()) {
                    tip += " and name server is " + controller.getBrokerConfig().getNamesrvAddr();
                }
    
                log.info(tip);
                System.out.printf("%s%n", tip);
                return controller;
            } catch (Throwable e) {
                e.printStackTrace();
                System.exit(-1);
            }
    
            return null;
        }
    
        public static void shutdown(final BrokerController controller) {
            if (null != controller) {
                controller.shutdown();
            }
        }
    
        public static BrokerController createBrokerController(String[] args) {
            System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
    
            if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) {
                NettySystemConfig.socketSndbufSize = 131072;
            }
    
            if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) {
                NettySystemConfig.socketRcvbufSize = 131072;
            }
    
            try {
                //PackageConflictDetect.detectFastjson();
                Options options = ServerUtil.buildCommandlineOptions(new Options());
                commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),
                    new PosixParser());
                if (null == commandLine) {
                    System.exit(-1);
                }
    
                final BrokerConfig brokerConfig = new BrokerConfig();
                final NettyServerConfig nettyServerConfig = new NettyServerConfig();
                final NettyClientConfig nettyClientConfig = new NettyClientConfig();
    
                nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,
                    String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
                nettyServerConfig.setListenPort(10911);
                final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
    
                if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) {
                    int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10;
                    messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio);
                }
    
                if (commandLine.hasOption('c')) {
                    String file = commandLine.getOptionValue('c');
                    if (file != null) {
                        configFile = file;
                        InputStream in = new BufferedInputStream(new FileInputStream(file));
                        properties = new Properties();
                        properties.load(in);
    
                        properties2SystemEnv(properties);
                        MixAll.properties2Object(properties, brokerConfig);
                        MixAll.properties2Object(properties, nettyServerConfig);
                        MixAll.properties2Object(properties, nettyClientConfig);
                        MixAll.properties2Object(properties, messageStoreConfig);
    
                        BrokerPathConfigHelper.setBrokerConfigPath(file);
                        in.close();
                    }
                }
    
                MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);
    
                if (null == brokerConfig.getRocketmqHome()) {
                    System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV);
                    System.exit(-2);
                }
    
                String namesrvAddr = brokerConfig.getNamesrvAddr();
                if (null != namesrvAddr) {
                    try {
                        String[] addrArray = namesrvAddr.split(";");
                        for (String addr : addrArray) {
                            RemotingUtil.string2SocketAddress(addr);
                        }
                    } catch (Exception e) {
                        System.out.printf(
                            "The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876;192.168.0.1:9876\"%n",
                            namesrvAddr);
                        System.exit(-3);
                    }
                }
    
                switch (messageStoreConfig.getBrokerRole()) {
                    case ASYNC_MASTER:
                    case SYNC_MASTER:
                        brokerConfig.setBrokerId(MixAll.MASTER_ID);
                        break;
                    case SLAVE:
                        if (brokerConfig.getBrokerId() <= 0) {
                            System.out.printf("Slave's brokerId must be > 0");
                            System.exit(-3);
                        }
    
                        break;
                    default:
                        break;
                }
    
                messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1);
                LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
                JoranConfigurator configurator = new JoranConfigurator();
                configurator.setContext(lc);
                lc.reset();
                configurator.doConfigure(brokerConfig.getRocketmqHome() + "/conf/logback_broker.xml");
    
                if (commandLine.hasOption('p')) {
                    InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
                    MixAll.printObjectProperties(console, brokerConfig);
                    MixAll.printObjectProperties(console, nettyServerConfig);
                    MixAll.printObjectProperties(console, nettyClientConfig);
                    MixAll.printObjectProperties(console, messageStoreConfig);
                    System.exit(0);
                } else if (commandLine.hasOption('m')) {
                    InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
                    MixAll.printObjectProperties(console, brokerConfig, true);
                    MixAll.printObjectProperties(console, nettyServerConfig, true);
                    MixAll.printObjectProperties(console, nettyClientConfig, true);
                    MixAll.printObjectProperties(console, messageStoreConfig, true);
                    System.exit(0);
                }
    
                log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
                MixAll.printObjectProperties(log, brokerConfig);
                MixAll.printObjectProperties(log, nettyServerConfig);
                MixAll.printObjectProperties(log, nettyClientConfig);
                MixAll.printObjectProperties(log, messageStoreConfig);
    
                final BrokerController controller = new BrokerController(
                    brokerConfig,
                    nettyServerConfig,
                    nettyClientConfig,
                    messageStoreConfig);
                // remember all configs to prevent discard
                controller.getConfiguration().registerConfig(properties);
    
                boolean initResult = controller.initialize();
                if (!initResult) {
                    controller.shutdown();
                    System.exit(-3);
                }
    
                Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
                    private volatile boolean hasShutdown = false;
                    private AtomicInteger shutdownTimes = new AtomicInteger(0);
    
                    @Override
                    public void run() {
                        synchronized (this) {
                            log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet());
                            if (!this.hasShutdown) {
                                this.hasShutdown = true;
                                long beginTime = System.currentTimeMillis();
                                controller.shutdown();
                                long consumingTimeTotal = System.currentTimeMillis() - beginTime;
                                log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal);
                            }
                        }
                    }
                }, "ShutdownHook"));
    
                return controller;
            } catch (Throwable e) {
                e.printStackTrace();
                System.exit(-1);
            }
    
            return null;
        }
    
        private static void properties2SystemEnv(Properties properties) {
            if (properties == null) {
                return;
            }
            String rmqAddressServerDomain = properties.getProperty("rmqAddressServerDomain", MixAll.WS_DOMAIN_NAME);
            String rmqAddressServerSubGroup = properties.getProperty("rmqAddressServerSubGroup", MixAll.WS_DOMAIN_SUBGROUP);
            System.setProperty("rocketmq.namesrv.domain", rmqAddressServerDomain);
            System.setProperty("rocketmq.namesrv.domain.subgroup", rmqAddressServerSubGroup);
        }
    
        private static Options buildCommandlineOptions(final Options options) {
            Option opt = new Option("c", "configFile", true, "Broker config properties file");
            opt.setRequired(false);
            options.addOption(opt);
    
            opt = new Option("p", "printConfigItem", false, "Print all config item");
            opt.setRequired(false);
            options.addOption(opt);
    
            opt = new Option("m", "printImportantConfig", false, "Print important config item");
            opt.setRequired(false);
            options.addOption(opt);
    
            return options;
        }
    }
    
    

    4、org.apache.rocketmq.broker.BrokerController.java 源码:

    /*
    D:\RocketMQ\rocketmq-master\broker\src\main\java\org\apache\rocketmq\broker\BrokerController.java
    
     * Licensed to the Apache Software Foundation (ASF) under one or more
     * contributor license agreements.  See the NOTICE file distributed with
     * this work for additional information regarding copyright ownership.
     * The ASF licenses this file to You under the Apache License, Version 2.0
     * (the "License"); you may not use this file except in compliance with
     * the License.  You may obtain a copy of the License at
     *
     *     http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    package org.apache.rocketmq.broker;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.util.ArrayList;
    import java.util.Iterator;
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.ConcurrentMap;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    import org.apache.rocketmq.acl.AccessValidator;
    import org.apache.rocketmq.broker.client.ClientHousekeepingService;
    import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
    import org.apache.rocketmq.broker.client.ConsumerManager;
    import org.apache.rocketmq.broker.client.DefaultConsumerIdsChangeListener;
    import org.apache.rocketmq.broker.client.ProducerManager;
    import org.apache.rocketmq.broker.client.net.Broker2Client;
    import org.apache.rocketmq.broker.client.rebalance.RebalanceLockManager;
    import org.apache.rocketmq.broker.dledger.DLedgerRoleChangeHandler;
    import org.apache.rocketmq.broker.filter.CommitLogDispatcherCalcBitMap;
    import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
    import org.apache.rocketmq.broker.filtersrv.FilterServerManager;
    import org.apache.rocketmq.broker.latency.BrokerFastFailure;
    import org.apache.rocketmq.broker.latency.BrokerFixedThreadPoolExecutor;
    import org.apache.rocketmq.broker.longpolling.NotifyMessageArrivingListener;
    import org.apache.rocketmq.broker.longpolling.PullRequestHoldService;
    import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
    import org.apache.rocketmq.broker.mqtrace.SendMessageHook;
    import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
    import org.apache.rocketmq.broker.out.BrokerOuterAPI;
    import org.apache.rocketmq.broker.plugin.MessageStoreFactory;
    import org.apache.rocketmq.broker.plugin.MessageStorePluginContext;
    import org.apache.rocketmq.broker.processor.AdminBrokerProcessor;
    import org.apache.rocketmq.broker.processor.ClientManageProcessor;
    import org.apache.rocketmq.broker.processor.ConsumerManageProcessor;
    import org.apache.rocketmq.broker.processor.EndTransactionProcessor;
    import org.apache.rocketmq.broker.processor.PullMessageProcessor;
    import org.apache.rocketmq.broker.processor.QueryMessageProcessor;
    import org.apache.rocketmq.broker.processor.SendMessageProcessor;
    import org.apache.rocketmq.broker.slave.SlaveSynchronize;
    import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
    import org.apache.rocketmq.broker.topic.TopicConfigManager;
    import org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener;
    import org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService;
    import org.apache.rocketmq.broker.transaction.TransactionalMessageService;
    import org.apache.rocketmq.broker.transaction.queue.DefaultTransactionalMessageCheckListener;
    import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge;
    import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl;
    import org.apache.rocketmq.broker.util.ServiceProvider;
    import org.apache.rocketmq.common.BrokerConfig;
    import org.apache.rocketmq.common.Configuration;
    import org.apache.rocketmq.common.DataVersion;
    import org.apache.rocketmq.common.ThreadFactoryImpl;
    import org.apache.rocketmq.common.TopicConfig;
    import org.apache.rocketmq.common.UtilAll;
    import org.apache.rocketmq.common.constant.LoggerName;
    import org.apache.rocketmq.common.constant.PermName;
    import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
    import org.apache.rocketmq.common.protocol.RequestCode;
    import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
    import org.apache.rocketmq.common.stats.MomentStatsItem;
    import org.apache.rocketmq.logging.InternalLogger;
    import org.apache.rocketmq.logging.InternalLoggerFactory;
    import org.apache.rocketmq.remoting.RPCHook;
    import org.apache.rocketmq.remoting.RemotingServer;
    import org.apache.rocketmq.remoting.common.TlsMode;
    import org.apache.rocketmq.remoting.netty.NettyClientConfig;
    import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
    import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
    import org.apache.rocketmq.remoting.netty.NettyServerConfig;
    import org.apache.rocketmq.remoting.netty.RequestTask;
    import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
    import org.apache.rocketmq.remoting.protocol.RemotingCommand;
    import org.apache.rocketmq.srvutil.FileWatchService;
    import org.apache.rocketmq.store.DefaultMessageStore;
    import org.apache.rocketmq.store.MessageArrivingListener;
    import org.apache.rocketmq.store.MessageStore;
    import org.apache.rocketmq.store.config.BrokerRole;
    import org.apache.rocketmq.store.config.MessageStoreConfig;
    import org.apache.rocketmq.store.dledger.DLedgerCommitLog;
    import org.apache.rocketmq.store.stats.BrokerStats;
    import org.apache.rocketmq.store.stats.BrokerStatsManager;
    
    public class BrokerController {
        private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
        private static final InternalLogger LOG_PROTECTION = InternalLoggerFactory.getLogger(LoggerName.PROTECTION_LOGGER_NAME);
        private static final InternalLogger LOG_WATER_MARK = InternalLoggerFactory.getLogger(LoggerName.WATER_MARK_LOGGER_NAME);
        private final BrokerConfig brokerConfig;
        private final NettyServerConfig nettyServerConfig;
        private final NettyClientConfig nettyClientConfig;
        private final MessageStoreConfig messageStoreConfig;
        private final ConsumerOffsetManager consumerOffsetManager;
        private final ConsumerManager consumerManager;
        private final ConsumerFilterManager consumerFilterManager;
        private final ProducerManager producerManager;
        private final ClientHousekeepingService clientHousekeepingService;
        private final PullMessageProcessor pullMessageProcessor;
        private final PullRequestHoldService pullRequestHoldService;
        private final MessageArrivingListener messageArrivingListener;
        private final Broker2Client broker2Client;
        private final SubscriptionGroupManager subscriptionGroupManager;
        private final ConsumerIdsChangeListener consumerIdsChangeListener;
        private final RebalanceLockManager rebalanceLockManager = new RebalanceLockManager();
        private final BrokerOuterAPI brokerOuterAPI;
        private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
            "BrokerControllerScheduledThread"));
        private final SlaveSynchronize slaveSynchronize;
        private final BlockingQueue<Runnable> sendThreadPoolQueue;
        private final BlockingQueue<Runnable> pullThreadPoolQueue;
        private final BlockingQueue<Runnable> queryThreadPoolQueue;
        private final BlockingQueue<Runnable> clientManagerThreadPoolQueue;
        private final BlockingQueue<Runnable> heartbeatThreadPoolQueue;
        private final BlockingQueue<Runnable> consumerManagerThreadPoolQueue;
        private final BlockingQueue<Runnable> endTransactionThreadPoolQueue;
        private final FilterServerManager filterServerManager;
        private final BrokerStatsManager brokerStatsManager;
        private final List<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();
        private final List<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();
        private MessageStore messageStore;
        private RemotingServer remotingServer;
        private RemotingServer fastRemotingServer;
        private TopicConfigManager topicConfigManager;
        private ExecutorService sendMessageExecutor;
        private ExecutorService pullMessageExecutor;
        private ExecutorService queryMessageExecutor;
        private ExecutorService adminBrokerExecutor;
        private ExecutorService clientManageExecutor;
        private ExecutorService heartbeatExecutor;
        private ExecutorService consumerManageExecutor;
        private ExecutorService endTransactionExecutor;
        private boolean updateMasterHAServerAddrPeriodically = false;
        private BrokerStats brokerStats;
        private InetSocketAddress storeHost;
        private BrokerFastFailure brokerFastFailure;
        private Configuration configuration;
        private FileWatchService fileWatchService;
        private TransactionalMessageCheckService transactionalMessageCheckService;
        private TransactionalMessageService transactionalMessageService;
        private AbstractTransactionalMessageCheckListener transactionalMessageCheckListener;
        private Future<?> slaveSyncFuture;
    
    
        public BrokerController(
            final BrokerConfig brokerConfig,
            final NettyServerConfig nettyServerConfig,
            final NettyClientConfig nettyClientConfig,
            final MessageStoreConfig messageStoreConfig
        ) {
            this.brokerConfig = brokerConfig;
            this.nettyServerConfig = nettyServerConfig;
            this.nettyClientConfig = nettyClientConfig;
            this.messageStoreConfig = messageStoreConfig;
            this.consumerOffsetManager = new ConsumerOffsetManager(this);
            this.topicConfigManager = new TopicConfigManager(this);
            this.pullMessageProcessor = new PullMessageProcessor(this);
            this.pullRequestHoldService = new PullRequestHoldService(this);
            this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService);
            this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);
            this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener);
            this.consumerFilterManager = new ConsumerFilterManager(this);
            this.producerManager = new ProducerManager();
            this.clientHousekeepingService = new ClientHousekeepingService(this);
            this.broker2Client = new Broker2Client(this);
            this.subscriptionGroupManager = new SubscriptionGroupManager(this);
            this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);
            this.filterServerManager = new FilterServerManager(this);
    
            this.slaveSynchronize = new SlaveSynchronize(this);
    
            this.sendThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity());
            this.pullThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getPullThreadPoolQueueCapacity());
            this.queryThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getQueryThreadPoolQueueCapacity());
            this.clientManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());
            this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());
            this.heartbeatThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity());
            this.endTransactionThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getEndTransactionPoolQueueCapacity());
    
            this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName());
            this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort()));
    
            this.brokerFastFailure = new BrokerFastFailure(this);
            this.configuration = new Configuration(
                log,
                BrokerPathConfigHelper.getBrokerConfigPath(),
                this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig
            );
        }
    
        public BrokerConfig getBrokerConfig() {
            return brokerConfig;
        }
    
        public NettyServerConfig getNettyServerConfig() {
            return nettyServerConfig;
        }
    
        public BlockingQueue<Runnable> getPullThreadPoolQueue() {
            return pullThreadPoolQueue;
        }
    
        public BlockingQueue<Runnable> getQueryThreadPoolQueue() {
            return queryThreadPoolQueue;
        }
    
        public boolean initialize() throws CloneNotSupportedException {
            boolean result = this.topicConfigManager.load();
    
            result = result && this.consumerOffsetManager.load();
            result = result && this.subscriptionGroupManager.load();
            result = result && this.consumerFilterManager.load();
    
            if (result) {
                try {
                    this.messageStore =
                        new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
                            this.brokerConfig);
                    if (messageStoreConfig.isEnableDLegerCommitLog()) {
                        DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);
                        ((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);
                    }
                    this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
                    //load plugin
                    MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);
                    this.messageStore = MessageStoreFactory.build(context, this.messageStore);
                    this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
                } catch (IOException e) {
                    result = false;
                    log.error("Failed to initialize", e);
                }
            }
    
            result = result && this.messageStore.load();
    
            if (result) {
                this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
                NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
                fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
                this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
                this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
                    this.brokerConfig.getSendMessageThreadPoolNums(),
                    this.brokerConfig.getSendMessageThreadPoolNums(),
                    1000 * 60,
                    TimeUnit.MILLISECONDS,
                    this.sendThreadPoolQueue,
                    new ThreadFactoryImpl("SendMessageThread_"));
    
                this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
                    this.brokerConfig.getPullMessageThreadPoolNums(),
                    this.brokerConfig.getPullMessageThreadPoolNums(),
                    1000 * 60,
                    TimeUnit.MILLISECONDS,
                    this.pullThreadPoolQueue,
                    new ThreadFactoryImpl("PullMessageThread_"));
    
                this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(
                    this.brokerConfig.getQueryMessageThreadPoolNums(),
                    this.brokerConfig.getQueryMessageThreadPoolNums(),
                    1000 * 60,
                    TimeUnit.MILLISECONDS,
                    this.queryThreadPoolQueue,
                    new ThreadFactoryImpl("QueryMessageThread_"));
    
                this.adminBrokerExecutor =
                    Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl(
                        "AdminBrokerThread_"));
    
                this.clientManageExecutor = new ThreadPoolExecutor(
                    this.brokerConfig.getClientManageThreadPoolNums(),
                    this.brokerConfig.getClientManageThreadPoolNums(),
                    1000 * 60,
                    TimeUnit.MILLISECONDS,
                    this.clientManagerThreadPoolQueue,
                    new ThreadFactoryImpl("ClientManageThread_"));
    
                this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor(
                    this.brokerConfig.getHeartbeatThreadPoolNums(),
                    this.brokerConfig.getHeartbeatThreadPoolNums(),
                    1000 * 60,
                    TimeUnit.MILLISECONDS,
                    this.heartbeatThreadPoolQueue,
                    new ThreadFactoryImpl("HeartbeatThread_", true));
    
                this.endTransactionExecutor = new BrokerFixedThreadPoolExecutor(
                    this.brokerConfig.getEndTransactionThreadPoolNums(),
                    this.brokerConfig.getEndTransactionThreadPoolNums(),
                    1000 * 60,
                    TimeUnit.MILLISECONDS,
                    this.endTransactionThreadPoolQueue,
                    new ThreadFactoryImpl("EndTransactionThread_"));
    
                this.consumerManageExecutor =
                    Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl(
                        "ConsumerManageThread_"));
    
                this.registerProcessor();
    
                final long initialDelay = UtilAll.computNextMorningTimeMillis() - System.currentTimeMillis();
                final long period = 1000 * 60 * 60 * 24;
                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            BrokerController.this.getBrokerStats().record();
                        } catch (Throwable e) {
                            log.error("schedule record error.", e);
                        }
                    }
                }, initialDelay, period, TimeUnit.MILLISECONDS);
    
                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            BrokerController.this.consumerOffsetManager.persist();
                        } catch (Throwable e) {
                            log.error("schedule persist consumerOffset error.", e);
                        }
                    }
                }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
    
                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            BrokerController.this.consumerFilterManager.persist();
                        } catch (Throwable e) {
                            log.error("schedule persist consumer filter error.", e);
                        }
                    }
                }, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);
    
                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            BrokerController.this.protectBroker();
                        } catch (Throwable e) {
                            log.error("protectBroker error.", e);
                        }
                    }
                }, 3, 3, TimeUnit.MINUTES);
    
                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            BrokerController.this.printWaterMark();
                        } catch (Throwable e) {
                            log.error("printWaterMark error.", e);
                        }
                    }
                }, 10, 1, TimeUnit.SECONDS);
    
                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
                    @Override
                    public void run() {
                        try {
                            log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());
                        } catch (Throwable e) {
                            log.error("schedule dispatchBehindBytes error.", e);
                        }
                    }
                }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
    
                if (this.brokerConfig.getNamesrvAddr() != null) {
                    this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());
                    log.info("Set user specified name server address: {}", this.brokerConfig.getNamesrvAddr());
                } else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {
                    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
                        @Override
                        public void run() {
                            try {
                                BrokerController.this.brokerOuterAPI.fetchNameServerAddr();
                            } catch (Throwable e) {
                                log.error("ScheduledTask fetchNameServerAddr exception", e);
                            }
                        }
                    }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
                }
    
                if (!messageStoreConfig.isEnableDLegerCommitLog()) {
                    if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
                        if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
                            this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
                            this.updateMasterHAServerAddrPeriodically = false;
                        } else {
                            this.updateMasterHAServerAddrPeriodically = true;
                        }
                    } else {
                        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                            @Override
                            public void run() {
                                try {
                                    BrokerController.this.printMasterAndSlaveDiff();
                                } catch (Throwable e) {
                                    log.error("schedule printMasterAndSlaveDiff error.", e);
                                }
                            }
                        }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
                    }
                }
    
                if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
                    // Register a listener to reload SslContext
                    try {
                        fileWatchService = new FileWatchService(
                            new String[] {
                                TlsSystemConfig.tlsServerCertPath,
                                TlsSystemConfig.tlsServerKeyPath,
                                TlsSystemConfig.tlsServerTrustCertPath
                            },
                            new FileWatchService.Listener() {
                                boolean certChanged, keyChanged = false;
    
                                @Override
                                public void onChanged(String path) {
                                    if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
                                        log.info("The trust certificate changed, reload the ssl context");
                                        reloadServerSslContext();
                                    }
                                    if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
                                        certChanged = true;
                                    }
                                    if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
                                        keyChanged = true;
                                    }
                                    if (certChanged && keyChanged) {
                                        log.info("The certificate and private key changed, reload the ssl context");
                                        certChanged = keyChanged = false;
                                        reloadServerSslContext();
                                    }
                                }
    
                                private void reloadServerSslContext() {
                                    ((NettyRemotingServer) remotingServer).loadSslContext();
                                    ((NettyRemotingServer) fastRemotingServer).loadSslContext();
                                }
                            });
                    } catch (Exception e) {
                        log.warn("FileWatchService created error, can't load the certificate dynamically");
                    }
                }
                initialTransaction();
                initialAcl();
                initialRpcHooks();
            }
            return result;
        }
    
        private void initialTransaction() {
            this.transactionalMessageService = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_SERVICE_ID, TransactionalMessageService.class);
            if (null == this.transactionalMessageService) {
                this.transactionalMessageService = new TransactionalMessageServiceImpl(new TransactionalMessageBridge(this, this.getMessageStore()));
                log.warn("Load default transaction message hook service: {}", TransactionalMessageServiceImpl.class.getSimpleName());
            }
            this.transactionalMessageCheckListener = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_LISTENER_ID, AbstractTransactionalMessageCheckListener.class);
            if (null == this.transactionalMessageCheckListener) {
                this.transactionalMessageCheckListener = new DefaultTransactionalMessageCheckListener();
                log.warn("Load default discard message hook service: {}", DefaultTransactionalMessageCheckListener.class.getSimpleName());
            }
            this.transactionalMessageCheckListener.setBrokerController(this);
            this.transactionalMessageCheckService = new TransactionalMessageCheckService(this);
        }
    
        private void initialAcl() {
            if (!this.brokerConfig.isAclEnable()) {
                log.info("The broker dose not enable acl");
                return;
            }
    
            List<AccessValidator> accessValidators = ServiceProvider.load(ServiceProvider.ACL_VALIDATOR_ID, AccessValidator.class);
            if (accessValidators == null || accessValidators.isEmpty()) {
                log.info("The broker dose not load the AccessValidator");
                return;
            }
    
            for (AccessValidator accessValidator: accessValidators) {
                final AccessValidator validator = accessValidator;
                this.registerServerRPCHook(new RPCHook() {
    
                    @Override
                    public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
                        //Do not catch the exception
                        validator.validate(validator.parse(request, remoteAddr));
                    }
    
                    @Override
                    public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) {
                    }
                });
            }
        }
    
    
        private void initialRpcHooks() {
    
            List<RPCHook> rpcHooks = ServiceProvider.load(ServiceProvider.RPC_HOOK_ID, RPCHook.class);
            if (rpcHooks == null || rpcHooks.isEmpty()) {
                return;
            }
            for (RPCHook rpcHook: rpcHooks) {
                this.registerServerRPCHook(rpcHook);
            }
        }
    
        public void registerProcessor() {
            /**
             * SendMessageProcessor
             */
            SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
            sendProcessor.registerSendMessageHook(sendMessageHookList);
            sendProcessor.registerConsumeMessageHook(consumeMessageHookList);
    
            this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
            this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
            this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
            this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
            this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
            this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
            this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
            this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
            /**
             * PullMessageProcessor
             */
            this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);
            this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
    
            /**
             * QueryMessageProcessor
             */
            NettyRequestProcessor queryProcessor = new QueryMessageProcessor(this);
            this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);
            this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);
    
            this.fastRemotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);
            this.fastRemotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);
    
            /**
             * ClientManageProcessor
             */
            ClientManageProcessor clientProcessor = new ClientManageProcessor(this);
            this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);
            this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);
            this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);
    
            this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);
            this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);
            this.fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);
    
            /**
             * ConsumerManageProcessor
             */
            ConsumerManageProcessor consumerManageProcessor = new ConsumerManageProcessor(this);
            this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
            this.remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
            this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
    
            this.fastRemotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
            this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
            this.fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
    
            /**
             * EndTransactionProcessor
             */
            this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor);
            this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor);
    
            /**
             * Default
             */
            AdminBrokerProcessor adminProcessor = new AdminBrokerProcessor(this);
            this.remotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);
            this.fastRemotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);
        }
    
        public BrokerStats getBrokerStats() {
            return brokerStats;
        }
    
        public void setBrokerStats(BrokerStats brokerStats) {
            this.brokerStats = brokerStats;
        }
    
        public void protectBroker() {
            if (this.brokerConfig.isDisableConsumeIfConsumerReadSlowly()) {
                final Iterator<Map.Entry<String, MomentStatsItem>> it = this.brokerStatsManager.getMomentStatsItemSetFallSize().getStatsItemTable().entrySet().iterator();
                while (it.hasNext()) {
                    final Map.Entry<String, MomentStatsItem> next = it.next();
                    final long fallBehindBytes = next.getValue().getValue().get();
                    if (fallBehindBytes > this.brokerConfig.getConsumerFallbehindThreshold()) {
                        final String[] split = next.getValue().getStatsKey().split("@");
                        final String group = split[2];
                        LOG_PROTECTION.info("[PROTECT_BROKER] the consumer[{}] consume slowly, {} bytes, disable it", group, fallBehindBytes);
                        this.subscriptionGroupManager.disableConsume(group);
                    }
                }
            }
        }
    
        public long headSlowTimeMills(BlockingQueue<Runnable> q) {
            long slowTimeMills = 0;
            final Runnable peek = q.peek();
            if (peek != null) {
                RequestTask rt = BrokerFastFailure.castRunnable(peek);
                slowTimeMills = rt == null ? 0 : this.messageStore.now() - rt.getCreateTimestamp();
            }
    
            if (slowTimeMills < 0) {
                slowTimeMills = 0;
            }
    
            return slowTimeMills;
        }
    
        public long headSlowTimeMills4SendThreadPoolQueue() {
            return this.headSlowTimeMills(this.sendThreadPoolQueue);
        }
    
        public long headSlowTimeMills4PullThreadPoolQueue() {
            return this.headSlowTimeMills(this.pullThreadPoolQueue);
        }
    
        public long headSlowTimeMills4QueryThreadPoolQueue() {
            return this.headSlowTimeMills(this.queryThreadPoolQueue);
        }
    
        public long headSlowTimeMills4EndTransactionThreadPoolQueue() {
            return this.headSlowTimeMills(this.endTransactionThreadPoolQueue);
        }
    
        public void printWaterMark() {
            LOG_WATER_MARK.info("[WATERMARK] Send Queue Size: {} SlowTimeMills: {}", this.sendThreadPoolQueue.size(), headSlowTimeMills4SendThreadPoolQueue());
            LOG_WATER_MARK.info("[WATERMARK] Pull Queue Size: {} SlowTimeMills: {}", this.pullThreadPoolQueue.size(), headSlowTimeMills4PullThreadPoolQueue());
            LOG_WATER_MARK.info("[WATERMARK] Query Queue Size: {} SlowTimeMills: {}", this.queryThreadPoolQueue.size(), headSlowTimeMills4QueryThreadPoolQueue());
            LOG_WATER_MARK.info("[WATERMARK] Transaction Queue Size: {} SlowTimeMills: {}", this.endTransactionThreadPoolQueue.size(), headSlowTimeMills4EndTransactionThreadPoolQueue());
        }
    
        public MessageStore getMessageStore() {
            return messageStore;
        }
    
        public void setMessageStore(MessageStore messageStore) {
            this.messageStore = messageStore;
        }
    
        private void printMasterAndSlaveDiff() {
            long diff = this.messageStore.slaveFallBehindMuch();
    
            // XXX: warn and notify me
            log.info("Slave fall behind master: {} bytes", diff);
        }
    
        public Broker2Client getBroker2Client() {
            return broker2Client;
        }
    
        public ConsumerManager getConsumerManager() {
            return consumerManager;
        }
    
        public ConsumerFilterManager getConsumerFilterManager() {
            return consumerFilterManager;
        }
    
        public ConsumerOffsetManager getConsumerOffsetManager() {
            return consumerOffsetManager;
        }
    
        public MessageStoreConfig getMessageStoreConfig() {
            return messageStoreConfig;
        }
    
        public ProducerManager getProducerManager() {
            return producerManager;
        }
    
        public void setFastRemotingServer(RemotingServer fastRemotingServer) {
            this.fastRemotingServer = fastRemotingServer;
        }
    
        public PullMessageProcessor getPullMessageProcessor() {
            return pullMessageProcessor;
        }
    
        public PullRequestHoldService getPullRequestHoldService() {
            return pullRequestHoldService;
        }
    
        public SubscriptionGroupManager getSubscriptionGroupManager() {
            return subscriptionGroupManager;
        }
    
        public void shutdown() {
            if (this.brokerStatsManager != null) {
                this.brokerStatsManager.shutdown();
            }
    
            if (this.clientHousekeepingService != null) {
                this.clientHousekeepingService.shutdown();
            }
    
            if (this.pullRequestHoldService != null) {
                this.pullRequestHoldService.shutdown();
            }
    
            if (this.remotingServer != null) {
                this.remotingServer.shutdown();
            }
    
            if (this.fastRemotingServer != null) {
                this.fastRemotingServer.shutdown();
            }
    
            if (this.fileWatchService != null) {
                this.fileWatchService.shutdown();
            }
    
            if (this.messageStore != null) {
                this.messageStore.shutdown();
            }
    
            this.scheduledExecutorService.shutdown();
            try {
                this.scheduledExecutorService.awaitTermination(5000, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
            }
    
            this.unregisterBrokerAll();
    
            if (this.sendMessageExecutor != null) {
                this.sendMessageExecutor.shutdown();
            }
    
            if (this.pullMessageExecutor != null) {
                this.pullMessageExecutor.shutdown();
            }
    
            if (this.adminBrokerExecutor != null) {
                this.adminBrokerExecutor.shutdown();
            }
    
            if (this.brokerOuterAPI != null) {
                this.brokerOuterAPI.shutdown();
            }
    
            this.consumerOffsetManager.persist();
    
            if (this.filterServerManager != null) {
                this.filterServerManager.shutdown();
            }
    
            if (this.brokerFastFailure != null) {
                this.brokerFastFailure.shutdown();
            }
    
            if (this.consumerFilterManager != null) {
                this.consumerFilterManager.persist();
            }
    
            if (this.clientManageExecutor != null) {
                this.clientManageExecutor.shutdown();
            }
    
            if (this.queryMessageExecutor != null) {
                this.queryMessageExecutor.shutdown();
            }
    
            if (this.consumerManageExecutor != null) {
                this.consumerManageExecutor.shutdown();
            }
    
            if (this.fileWatchService != null) {
                this.fileWatchService.shutdown();
            }
            if (this.transactionalMessageCheckService != null) {
                this.transactionalMessageCheckService.shutdown(false);
            }
    
            if (this.endTransactionExecutor != null) {
                this.endTransactionExecutor.shutdown();
            }
        }
    
        private void unregisterBrokerAll() {
            this.brokerOuterAPI.unregisterBrokerAll(
                this.brokerConfig.getBrokerClusterName(),
                this.getBrokerAddr(),
                this.brokerConfig.getBrokerName(),
                this.brokerConfig.getBrokerId());
        }
    
        public String getBrokerAddr() {
            return this.brokerConfig.getBrokerIP1() + ":" + this.nettyServerConfig.getListenPort();
        }
    
        public void start() throws Exception {
            if (this.messageStore != null) {
                this.messageStore.start();
            }
    
            if (this.remotingServer != null) {
                this.remotingServer.start();
            }
    
            if (this.fastRemotingServer != null) {
                this.fastRemotingServer.start();
            }
    
            if (this.fileWatchService != null) {
                this.fileWatchService.start();
            }
    
            if (this.brokerOuterAPI != null) {
                this.brokerOuterAPI.start();
            }
    
            if (this.pullRequestHoldService != null) {
                this.pullRequestHoldService.start();
            }
    
            if (this.clientHousekeepingService != null) {
                this.clientHousekeepingService.start();
            }
    
            if (this.filterServerManager != null) {
                this.filterServerManager.start();
            }
    
            if (!messageStoreConfig.isEnableDLegerCommitLog()) {
                startProcessorByHa(messageStoreConfig.getBrokerRole());
                handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
            }
    
    
    
            this.registerBrokerAll(true, false, true);
    
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
                @Override
                public void run() {
                    try {
                        BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
                    } catch (Throwable e) {
                        log.error("registerBrokerAll Exception", e);
                    }
                }
            }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
    
            if (this.brokerStatsManager != null) {
                this.brokerStatsManager.start();
            }
    
            if (this.brokerFastFailure != null) {
                this.brokerFastFailure.start();
            }
    
    
        }
    
        public synchronized void registerIncrementBrokerData(TopicConfig topicConfig, DataVersion dataVersion) {
            TopicConfig registerTopicConfig = topicConfig;
            if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
                || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
                registerTopicConfig =
                    new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
                        this.brokerConfig.getBrokerPermission());
            }
    
            ConcurrentMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
            topicConfigTable.put(topicConfig.getTopicName(), registerTopicConfig);
            TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
            topicConfigSerializeWrapper.setDataVersion(dataVersion);
            topicConfigSerializeWrapper.setTopicConfigTable(topicConfigTable);
    
            doRegisterBrokerAll(true, false, topicConfigSerializeWrapper);
        }
    
        public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {
            TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
    
            if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
                || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
                ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
                for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
                    TopicConfig tmp =
                        new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
                            this.brokerConfig.getBrokerPermission());
                    topicConfigTable.put(topicConfig.getTopicName(), tmp);
                }
                topicConfigWrapper.setTopicConfigTable(topicConfigTable);
            }
    
            if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
                this.getBrokerAddr(),
                this.brokerConfig.getBrokerName(),
                this.brokerConfig.getBrokerId(),
                this.brokerConfig.getRegisterBrokerTimeoutMills())) {
                doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
            }
        }
    
        private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
            TopicConfigSerializeWrapper topicConfigWrapper) {
            List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
                this.brokerConfig.getBrokerClusterName(),
                this.getBrokerAddr(),
                this.brokerConfig.getBrokerName(),
                this.brokerConfig.getBrokerId(),
                this.getHAServerAddr(),
                topicConfigWrapper,
                this.filterServerManager.buildNewFilterServerList(),
                oneway,
                this.brokerConfig.getRegisterBrokerTimeoutMills(),
                this.brokerConfig.isCompressedRegister());
    
            if (registerBrokerResultList.size() > 0) {
                RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0);
                if (registerBrokerResult != null) {
                    if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
                        this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
                    }
    
                    this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());
    
                    if (checkOrderConfig) {
                        this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());
                    }
                }
            }
        }
    
        private boolean needRegister(final String clusterName,
            final String brokerAddr,
            final String brokerName,
            final long brokerId,
            final int timeoutMills) {
    
            TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
            List<Boolean> changeList = brokerOuterAPI.needRegister(clusterName, brokerAddr, brokerName, brokerId, topicConfigWrapper, timeoutMills);
            boolean needRegister = false;
            for (Boolean changed : changeList) {
                if (changed) {
                    needRegister = true;
                    break;
                }
            }
            return needRegister;
        }
    
        public TopicConfigManager getTopicConfigManager() {
            return topicConfigManager;
        }
    
        public void setTopicConfigManager(TopicConfigManager topicConfigManager) {
            this.topicConfigManager = topicConfigManager;
        }
    
        public String getHAServerAddr() {
            return this.brokerConfig.getBrokerIP2() + ":" + this.messageStoreConfig.getHaListenPort();
        }
    
        public RebalanceLockManager getRebalanceLockManager() {
            return rebalanceLockManager;
        }
    
        public SlaveSynchronize getSlaveSynchronize() {
            return slaveSynchronize;
        }
    
        public ExecutorService getPullMessageExecutor() {
            return pullMessageExecutor;
        }
    
        public void setPullMessageExecutor(ExecutorService pullMessageExecutor) {
            this.pullMessageExecutor = pullMessageExecutor;
        }
    
        public BlockingQueue<Runnable> getSendThreadPoolQueue() {
            return sendThreadPoolQueue;
        }
    
        public FilterServerManager getFilterServerManager() {
            return filterServerManager;
        }
    
        public BrokerStatsManager getBrokerStatsManager() {
            return brokerStatsManager;
        }
    
        public List<SendMessageHook> getSendMessageHookList() {
            return sendMessageHookList;
        }
    
        public void registerSendMessageHook(final SendMessageHook hook) {
            this.sendMessageHookList.add(hook);
            log.info("register SendMessageHook Hook, {}", hook.hookName());
        }
    
        public List<ConsumeMessageHook> getConsumeMessageHookList() {
            return consumeMessageHookList;
        }
    
        public void registerConsumeMessageHook(final ConsumeMessageHook hook) {
            this.consumeMessageHookList.add(hook);
            log.info("register ConsumeMessageHook Hook, {}", hook.hookName());
        }
    
        public void registerServerRPCHook(RPCHook rpcHook) {
            getRemotingServer().registerRPCHook(rpcHook);
            this.fastRemotingServer.registerRPCHook(rpcHook);
        }
    
        public RemotingServer getRemotingServer() {
            return remotingServer;
        }
    
        public void setRemotingServer(RemotingServer remotingServer) {
            this.remotingServer = remotingServer;
        }
    
        public void registerClientRPCHook(RPCHook rpcHook) {
            this.getBrokerOuterAPI().registerRPCHook(rpcHook);
        }
    
        public BrokerOuterAPI getBrokerOuterAPI() {
            return brokerOuterAPI;
        }
    
        public InetSocketAddress getStoreHost() {
            return storeHost;
        }
    
        public void setStoreHost(InetSocketAddress storeHost) {
            this.storeHost = storeHost;
        }
    
        public Configuration getConfiguration() {
            return this.configuration;
        }
    
        public BlockingQueue<Runnable> getHeartbeatThreadPoolQueue() {
            return heartbeatThreadPoolQueue;
        }
    
        public TransactionalMessageCheckService getTransactionalMessageCheckService() {
            return transactionalMessageCheckService;
        }
    
        public void setTransactionalMessageCheckService(
            TransactionalMessageCheckService transactionalMessageCheckService) {
            this.transactionalMessageCheckService = transactionalMessageCheckService;
        }
    
        public TransactionalMessageService getTransactionalMessageService() {
            return transactionalMessageService;
        }
    
        public void setTransactionalMessageService(TransactionalMessageService transactionalMessageService) {
            this.transactionalMessageService = transactionalMessageService;
        }
    
        public AbstractTransactionalMessageCheckListener getTransactionalMessageCheckListener() {
            return transactionalMessageCheckListener;
        }
    
        public void setTransactionalMessageCheckListener(
            AbstractTransactionalMessageCheckListener transactionalMessageCheckListener) {
            this.transactionalMessageCheckListener = transactionalMessageCheckListener;
        }
    
    
        public BlockingQueue<Runnable> getEndTransactionThreadPoolQueue() {
            return endTransactionThreadPoolQueue;
    
        }
    
    
    
        private void handleSlaveSynchronize(BrokerRole role) {
            if (role == BrokerRole.SLAVE) {
                if (null != slaveSyncFuture) {
                    slaveSyncFuture.cancel(false);
                }
                this.slaveSynchronize.setMasterAddr(null);
                slaveSyncFuture = this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            BrokerController.this.slaveSynchronize.syncAll();
                        }
                        catch (Throwable e) {
                            log.error("ScheduledTask SlaveSynchronize syncAll error.", e);
                        }
                    }
                }, 1000 * 3, 1000 * 10, TimeUnit.MILLISECONDS);
            } else {
                //handle the slave synchronise
                if (null != slaveSyncFuture) {
                    slaveSyncFuture.cancel(false);
                }
                this.slaveSynchronize.setMasterAddr(null);
            }
        }
    
        public void changeToSlave(int brokerId) {
            log.info("Begin to change to slave brokerName={} brokerId={}", brokerConfig.getBrokerName(), brokerId);
    
            //change the role
            brokerConfig.setBrokerId(brokerId == 0 ? 1 : brokerId); //TO DO check
            messageStoreConfig.setBrokerRole(BrokerRole.SLAVE);
    
            //handle the scheduled service
            try {
                this.messageStore.handleScheduleMessageService(BrokerRole.SLAVE);
            } catch (Throwable t) {
                log.error("[MONITOR] handleScheduleMessageService failed when changing to slave", t);
            }
    
            //handle the transactional service
            try {
                this.shutdownProcessorByHa();
            } catch (Throwable t) {
                log.error("[MONITOR] shutdownProcessorByHa failed when changing to slave", t);
            }
    
            //handle the slave synchronise
            handleSlaveSynchronize(BrokerRole.SLAVE);
    
            try {
                this.registerBrokerAll(true, true, brokerConfig.isForceRegister());
            } catch (Throwable ignored) {
    
            }
            log.info("Finish to change to slave brokerName={} brokerId={}", brokerConfig.getBrokerName(), brokerId);
        }
    
    
    
        public void changeToMaster(BrokerRole role) {
            if (role == BrokerRole.SLAVE) {
                return;
            }
            log.info("Begin to change to master brokerName={}", brokerConfig.getBrokerName());
    
            //handle the slave synchronise
            handleSlaveSynchronize(role);
    
            //handle the scheduled service
            try {
                this.messageStore.handleScheduleMessageService(role);
            } catch (Throwable t) {
                log.error("[MONITOR] handleScheduleMessageService failed when changing to master", t);
            }
    
            //handle the transactional service
            try {
                this.startProcessorByHa(BrokerRole.SYNC_MASTER);
            } catch (Throwable t) {
                log.error("[MONITOR] startProcessorByHa failed when changing to master", t);
            }
    
            //if the operations above are totally successful, we change to master
            brokerConfig.setBrokerId(0); //TO DO check
            messageStoreConfig.setBrokerRole(role);
    
            try {
                this.registerBrokerAll(true, true, brokerConfig.isForceRegister());
            } catch (Throwable ignored) {
    
            }
            log.info("Finish to change to master brokerName={}", brokerConfig.getBrokerName());
        }
    
        private void startProcessorByHa(BrokerRole role) {
            if (BrokerRole.SLAVE != role) {
                if (this.transactionalMessageCheckService != null) {
                    this.transactionalMessageCheckService.start();
                }
            }
        }
    
        private void shutdownProcessorByHa() {
            if (this.transactionalMessageCheckService != null) {
                this.transactionalMessageCheckService.shutdown(true);
            }
        }
    
    
    
    }
    
    

    5、org.apache.rocketmq.common.BrokerConfig.java 源码:

    /*
    D:\RocketMQ\rocketmq-master\common\src\main\java\org\apache\rocketmq\common\BrokerConfig.java
    
     * Licensed to the Apache Software Foundation (ASF) under one or more
     * contributor license agreements.  See the NOTICE file distributed with
     * this work for additional information regarding copyright ownership.
     * The ASF licenses this file to You under the Apache License, Version 2.0
     * (the "License"); you may not use this file except in compliance with
     * the License.  You may obtain a copy of the License at
     *
     *     http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    package org.apache.rocketmq.common;
    
    import java.net.InetAddress;
    import java.net.UnknownHostException;
    import org.apache.rocketmq.common.annotation.ImportantField;
    import org.apache.rocketmq.common.constant.LoggerName;
    import org.apache.rocketmq.common.constant.PermName;
    import org.apache.rocketmq.logging.InternalLogger;
    import org.apache.rocketmq.logging.InternalLoggerFactory;
    import org.apache.rocketmq.remoting.common.RemotingUtil;
    
    public class BrokerConfig {
        private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
    
        private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
        @ImportantField
        private String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV));
        @ImportantField
        private String brokerIP1 = RemotingUtil.getLocalAddress();
        private String brokerIP2 = RemotingUtil.getLocalAddress();
        @ImportantField
        private String brokerName = localHostName();
        @ImportantField
        private String brokerClusterName = "DefaultCluster";
        @ImportantField
        private long brokerId = MixAll.MASTER_ID;
        private int brokerPermission = PermName.PERM_READ | PermName.PERM_WRITE;
        private int defaultTopicQueueNums = 8;
        @ImportantField
        private boolean autoCreateTopicEnable = true;
    
        private boolean clusterTopicEnable = true;
    
        private boolean brokerTopicEnable = true;
        @ImportantField
        private boolean autoCreateSubscriptionGroup = true;
        private String messageStorePlugIn = "";
        @ImportantField
        private String msgTraceTopicName = MixAll.RMQ_SYS_TRACE_TOPIC;
        @ImportantField
        private boolean traceTopicEnable = false;
        /**
         * thread numbers for send message thread pool, since spin lock will be used by default since 4.0.x, the default
         * value is 1.
         */
        private int sendMessageThreadPoolNums = 1; //16 + Runtime.getRuntime().availableProcessors() * 4;
        private int pullMessageThreadPoolNums = 16 + Runtime.getRuntime().availableProcessors() * 2;
        private int queryMessageThreadPoolNums = 8 + Runtime.getRuntime().availableProcessors();
    
        private int adminBrokerThreadPoolNums = 16;
        private int clientManageThreadPoolNums = 32;
        private int consumerManageThreadPoolNums = 32;
        private int heartbeatThreadPoolNums = Math.min(32, Runtime.getRuntime().availableProcessors());
    
        /**
         * Thread numbers for EndTransactionProcessor
         */
        private int endTransactionThreadPoolNums = 8 + Runtime.getRuntime().availableProcessors() * 2;
    
        private int flushConsumerOffsetInterval = 1000 * 5;
    
        private int flushConsumerOffsetHistoryInterval = 1000 * 60;
    
        @ImportantField
        private boolean rejectTransactionMessage = false;
        @ImportantField
        private boolean fetchNamesrvAddrByAddressServer = false;
        private int sendThreadPoolQueueCapacity = 10000;
        private int pullThreadPoolQueueCapacity = 100000;
        private int queryThreadPoolQueueCapacity = 20000;
        private int clientManagerThreadPoolQueueCapacity = 1000000;
        private int consumerManagerThreadPoolQueueCapacity = 1000000;
        private int heartbeatThreadPoolQueueCapacity = 50000;
        private int endTransactionPoolQueueCapacity = 100000;
    
        private int filterServerNums = 0;
    
        private boolean longPollingEnable = true;
    
        private long shortPollingTimeMills = 1000;
    
        private boolean notifyConsumerIdsChangedEnable = true;
    
        private boolean highSpeedMode = false;
    
        private boolean commercialEnable = true;
        private int commercialTimerCount = 1;
        private int commercialTransCount = 1;
        private int commercialBigCount = 1;
        private int commercialBaseCount = 1;
    
        private boolean transferMsgByHeap = true;
        private int maxDelayTime = 40;
    
        private String regionId = MixAll.DEFAULT_TRACE_REGION_ID;
        private int registerBrokerTimeoutMills = 6000;
    
        private boolean slaveReadEnable = false;
    
        private boolean disableConsumeIfConsumerReadSlowly = false;
        private long consumerFallbehindThreshold = 1024L * 1024 * 1024 * 16;
    
        private boolean brokerFastFailureEnable = true;
        private long waitTimeMillsInSendQueue = 200;
        private long waitTimeMillsInPullQueue = 5 * 1000;
        private long waitTimeMillsInHeartbeatQueue = 31 * 1000;
        private long waitTimeMillsInTransactionQueue = 3 * 1000;
    
        private long startAcceptSendRequestTimeStamp = 0L;
    
        private boolean traceOn = true;
    
        // Switch of filter bit map calculation.
        // If switch on:
        // 1. Calculate filter bit map when construct queue.
        // 2. Filter bit map will be saved to consume queue extend file if allowed.
        private boolean enableCalcFilterBitMap = false;
    
        // Expect num of consumers will use filter.
        private int expectConsumerNumUseFilter = 32;
    
        // Error rate of bloom filter, 1~100.
        private int maxErrorRateOfBloomFilter = 20;
    
        //how long to clean filter data after dead.Default: 24h
        private long filterDataCleanTimeSpan = 24 * 3600 * 1000;
    
        // whether do filter when retry.
        private boolean filterSupportRetry = false;
        private boolean enablePropertyFilter = false;
    
        private boolean compressedRegister = false;
    
        private boolean forceRegister = true;
    
        /**
         * This configurable item defines interval of topics registration of broker to name server. Allowing values are
         * between 10, 000 and 60, 000 milliseconds.
         */
        private int registerNameServerPeriod = 1000 * 30;
    
        /**
         * The minimum time of the transactional message  to be checked firstly, one message only exceed this time interval
         * that can be checked.
         */
        @ImportantField
        private long transactionTimeOut = 6 * 1000;
    
        /**
         * The maximum number of times the message was checked, if exceed this value, this message will be discarded.
         */
        @ImportantField
        private int transactionCheckMax = 15;
    
        /**
         * Transaction message check interval.
         */
        @ImportantField
        private long transactionCheckInterval = 60 * 1000;
    
        /**
         * Acl feature switch
         */
        @ImportantField
        private boolean aclEnable = false;
    
        public static String localHostName() {
            try {
                return InetAddress.getLocalHost().getHostName();
            } catch (UnknownHostException e) {
                log.error("Failed to obtain the host name", e);
            }
    
            return "DEFAULT_BROKER";
        }
    
        public boolean isTraceOn() {
            return traceOn;
        }
    
        public void setTraceOn(final boolean traceOn) {
            this.traceOn = traceOn;
        }
    
        public long getStartAcceptSendRequestTimeStamp() {
            return startAcceptSendRequestTimeStamp;
        }
    
        public void setStartAcceptSendRequestTimeStamp(final long startAcceptSendRequestTimeStamp) {
            this.startAcceptSendRequestTimeStamp = startAcceptSendRequestTimeStamp;
        }
    
        public long getWaitTimeMillsInSendQueue() {
            return waitTimeMillsInSendQueue;
        }
    
        public void setWaitTimeMillsInSendQueue(final long waitTimeMillsInSendQueue) {
            this.waitTimeMillsInSendQueue = waitTimeMillsInSendQueue;
        }
    
        public long getConsumerFallbehindThreshold() {
            return consumerFallbehindThreshold;
        }
    
        public void setConsumerFallbehindThreshold(final long consumerFallbehindThreshold) {
            this.consumerFallbehindThreshold = consumerFallbehindThreshold;
        }
    
        public boolean isBrokerFastFailureEnable() {
            return brokerFastFailureEnable;
        }
    
        public void setBrokerFastFailureEnable(final boolean brokerFastFailureEnable) {
            this.brokerFastFailureEnable = brokerFastFailureEnable;
        }
    
        public long getWaitTimeMillsInPullQueue() {
            return waitTimeMillsInPullQueue;
        }
    
        public void setWaitTimeMillsInPullQueue(final long waitTimeMillsInPullQueue) {
            this.waitTimeMillsInPullQueue = waitTimeMillsInPullQueue;
        }
    
        public boolean isDisableConsumeIfConsumerReadSlowly() {
            return disableConsumeIfConsumerReadSlowly;
        }
    
        public void setDisableConsumeIfConsumerReadSlowly(final boolean disableConsumeIfConsumerReadSlowly) {
            this.disableConsumeIfConsumerReadSlowly = disableConsumeIfConsumerReadSlowly;
        }
    
        public boolean isSlaveReadEnable() {
            return slaveReadEnable;
        }
    
        public void setSlaveReadEnable(final boolean slaveReadEnable) {
            this.slaveReadEnable = slaveReadEnable;
        }
    
        public int getRegisterBrokerTimeoutMills() {
            return registerBrokerTimeoutMills;
        }
    
        public void setRegisterBrokerTimeoutMills(final int registerBrokerTimeoutMills) {
            this.registerBrokerTimeoutMills = registerBrokerTimeoutMills;
        }
    
        public String getRegionId() {
            return regionId;
        }
    
        public void setRegionId(final String regionId) {
            this.regionId = regionId;
        }
    
        public boolean isTransferMsgByHeap() {
            return transferMsgByHeap;
        }
    
        public void setTransferMsgByHeap(final boolean transferMsgByHeap) {
            this.transferMsgByHeap = transferMsgByHeap;
        }
    
        public String getMessageStorePlugIn() {
            return messageStorePlugIn;
        }
    
        public void setMessageStorePlugIn(String messageStorePlugIn) {
            this.messageStorePlugIn = messageStorePlugIn;
        }
    
        public boolean isHighSpeedMode() {
            return highSpeedMode;
        }
    
        public void setHighSpeedMode(final boolean highSpeedMode) {
            this.highSpeedMode = highSpeedMode;
        }
    
        public String getRocketmqHome() {
            return rocketmqHome;
        }
    
        public void setRocketmqHome(String rocketmqHome) {
            this.rocketmqHome = rocketmqHome;
        }
    
        public String getBrokerName() {
            return brokerName;
        }
    
        public void setBrokerName(String brokerName) {
            this.brokerName = brokerName;
        }
    
        public int getBrokerPermission() {
            return brokerPermission;
        }
    
        public void setBrokerPermission(int brokerPermission) {
            this.brokerPermission = brokerPermission;
        }
    
        public int getDefaultTopicQueueNums() {
            return defaultTopicQueueNums;
        }
    
        public void setDefaultTopicQueueNums(int defaultTopicQueueNums) {
            this.defaultTopicQueueNums = defaultTopicQueueNums;
        }
    
        public boolean isAutoCreateTopicEnable() {
            return autoCreateTopicEnable;
        }
    
        public void setAutoCreateTopicEnable(boolean autoCreateTopic) {
            this.autoCreateTopicEnable = autoCreateTopic;
        }
    
        public String getBrokerClusterName() {
            return brokerClusterName;
        }
    
        public void setBrokerClusterName(String brokerClusterName) {
            this.brokerClusterName = brokerClusterName;
        }
    
        public String getBrokerIP1() {
            return brokerIP1;
        }
    
        public void setBrokerIP1(String brokerIP1) {
            this.brokerIP1 = brokerIP1;
        }
    
        public String getBrokerIP2() {
            return brokerIP2;
        }
    
        public void setBrokerIP2(String brokerIP2) {
            this.brokerIP2 = brokerIP2;
        }
    
        public int getSendMessageThreadPoolNums() {
            return sendMessageThreadPoolNums;
        }
    
        public void setSendMessageThreadPoolNums(int sendMessageThreadPoolNums) {
            this.sendMessageThreadPoolNums = sendMessageThreadPoolNums;
        }
    
        public int getPullMessageThreadPoolNums() {
            return pullMessageThreadPoolNums;
        }
    
        public void setPullMessageThreadPoolNums(int pullMessageThreadPoolNums) {
            this.pullMessageThreadPoolNums = pullMessageThreadPoolNums;
        }
    
        public int getQueryMessageThreadPoolNums() {
            return queryMessageThreadPoolNums;
        }
    
        public void setQueryMessageThreadPoolNums(final int queryMessageThreadPoolNums) {
            this.queryMessageThreadPoolNums = queryMessageThreadPoolNums;
        }
    
        public int getAdminBrokerThreadPoolNums() {
            return adminBrokerThreadPoolNums;
        }
    
        public void setAdminBrokerThreadPoolNums(int adminBrokerThreadPoolNums) {
            this.adminBrokerThreadPoolNums = adminBrokerThreadPoolNums;
        }
    
        public int getFlushConsumerOffsetInterval() {
            return flushConsumerOffsetInterval;
        }
    
        public void setFlushConsumerOffsetInterval(int flushConsumerOffsetInterval) {
            this.flushConsumerOffsetInterval = flushConsumerOffsetInterval;
        }
    
        public int getFlushConsumerOffsetHistoryInterval() {
            return flushConsumerOffsetHistoryInterval;
        }
    
        public void setFlushConsumerOffsetHistoryInterval(int flushConsumerOffsetHistoryInterval) {
            this.flushConsumerOffsetHistoryInterval = flushConsumerOffsetHistoryInterval;
        }
    
        public boolean isClusterTopicEnable() {
            return clusterTopicEnable;
        }
    
        public void setClusterTopicEnable(boolean clusterTopicEnable) {
            this.clusterTopicEnable = clusterTopicEnable;
        }
    
        public String getNamesrvAddr() {
            return namesrvAddr;
        }
    
        public void setNamesrvAddr(String namesrvAddr) {
            this.namesrvAddr = namesrvAddr;
        }
    
        public long getBrokerId() {
            return brokerId;
        }
    
        public void setBrokerId(long brokerId) {
            this.brokerId = brokerId;
        }
    
        public boolean isAutoCreateSubscriptionGroup() {
            return autoCreateSubscriptionGroup;
        }
    
        public void setAutoCreateSubscriptionGroup(boolean autoCreateSubscriptionGroup) {
            this.autoCreateSubscriptionGroup = autoCreateSubscriptionGroup;
        }
    
        public boolean isRejectTransactionMessage() {
            return rejectTransactionMessage;
        }
    
        public void setRejectTransactionMessage(boolean rejectTransactionMessage) {
            this.rejectTransactionMessage = rejectTransactionMessage;
        }
    
        public boolean isFetchNamesrvAddrByAddressServer() {
            return fetchNamesrvAddrByAddressServer;
        }
    
        public void setFetchNamesrvAddrByAddressServer(boolean fetchNamesrvAddrByAddressServer) {
            this.fetchNamesrvAddrByAddressServer = fetchNamesrvAddrByAddressServer;
        }
    
        public int getSendThreadPoolQueueCapacity() {
            return sendThreadPoolQueueCapacity;
        }
    
        public void setSendThreadPoolQueueCapacity(int sendThreadPoolQueueCapacity) {
            this.sendThreadPoolQueueCapacity = sendThreadPoolQueueCapacity;
        }
    
        public int getPullThreadPoolQueueCapacity() {
            return pullThreadPoolQueueCapacity;
        }
    
        public void setPullThreadPoolQueueCapacity(int pullThreadPoolQueueCapacity) {
            this.pullThreadPoolQueueCapacity = pullThreadPoolQueueCapacity;
        }
    
        public int getQueryThreadPoolQueueCapacity() {
            return queryThreadPoolQueueCapacity;
        }
    
        public void setQueryThreadPoolQueueCapacity(final int queryThreadPoolQueueCapacity) {
            this.queryThreadPoolQueueCapacity = queryThreadPoolQueueCapacity;
        }
    
        public boolean isBrokerTopicEnable() {
            return brokerTopicEnable;
        }
    
        public void setBrokerTopicEnable(boolean brokerTopicEnable) {
            this.brokerTopicEnable = brokerTopicEnable;
        }
    
        public int getFilterServerNums() {
            return filterServerNums;
        }
    
        public void setFilterServerNums(int filterServerNums) {
            this.filterServerNums = filterServerNums;
        }
    
        public boolean isLongPollingEnable() {
            return longPollingEnable;
        }
    
        public void setLongPollingEnable(boolean longPollingEnable) {
            this.longPollingEnable = longPollingEnable;
        }
    
        public boolean isNotifyConsumerIdsChangedEnable() {
            return notifyConsumerIdsChangedEnable;
        }
    
        public void setNotifyConsumerIdsChangedEnable(boolean notifyConsumerIdsChangedEnable) {
            this.notifyConsumerIdsChangedEnable = notifyConsumerIdsChangedEnable;
        }
    
        public long getShortPollingTimeMills() {
            return shortPollingTimeMills;
        }
    
        public void setShortPollingTimeMills(long shortPollingTimeMills) {
            this.shortPollingTimeMills = shortPollingTimeMills;
        }
    
        public int getClientManageThreadPoolNums() {
            return clientManageThreadPoolNums;
        }
    
        public void setClientManageThreadPoolNums(int clientManageThreadPoolNums) {
            this.clientManageThreadPoolNums = clientManageThreadPoolNums;
        }
    
        public boolean isCommercialEnable() {
            return commercialEnable;
        }
    
        public void setCommercialEnable(final boolean commercialEnable) {
            this.commercialEnable = commercialEnable;
        }
    
        public int getCommercialTimerCount() {
            return commercialTimerCount;
        }
    
        public void setCommercialTimerCount(final int commercialTimerCount) {
            this.commercialTimerCount = commercialTimerCount;
        }
    
        public int getCommercialTransCount() {
            return commercialTransCount;
        }
    
        public void setCommercialTransCount(final int commercialTransCount) {
            this.commercialTransCount = commercialTransCount;
        }
    
        public int getCommercialBigCount() {
            return commercialBigCount;
        }
    
        public void setCommercialBigCount(final int commercialBigCount) {
            this.commercialBigCount = commercialBigCount;
        }
    
        public int getMaxDelayTime() {
            return maxDelayTime;
        }
    
        public void setMaxDelayTime(final int maxDelayTime) {
            this.maxDelayTime = maxDelayTime;
        }
    
        public int getClientManagerThreadPoolQueueCapacity() {
            return clientManagerThreadPoolQueueCapacity;
        }
    
        public void setClientManagerThreadPoolQueueCapacity(int clientManagerThreadPoolQueueCapacity) {
            this.clientManagerThreadPoolQueueCapacity = clientManagerThreadPoolQueueCapacity;
        }
    
        public int getConsumerManagerThreadPoolQueueCapacity() {
            return consumerManagerThreadPoolQueueCapacity;
        }
    
        public void setConsumerManagerThreadPoolQueueCapacity(int consumerManagerThreadPoolQueueCapacity) {
            this.consumerManagerThreadPoolQueueCapacity = consumerManagerThreadPoolQueueCapacity;
        }
    
        public int getConsumerManageThreadPoolNums() {
            return consumerManageThreadPoolNums;
        }
    
        public void setConsumerManageThreadPoolNums(int consumerManageThreadPoolNums) {
            this.consumerManageThreadPoolNums = consumerManageThreadPoolNums;
        }
    
        public int getCommercialBaseCount() {
            return commercialBaseCount;
        }
    
        public void setCommercialBaseCount(int commercialBaseCount) {
            this.commercialBaseCount = commercialBaseCount;
        }
    
        public boolean isEnableCalcFilterBitMap() {
            return enableCalcFilterBitMap;
        }
    
        public void setEnableCalcFilterBitMap(boolean enableCalcFilterBitMap) {
            this.enableCalcFilterBitMap = enableCalcFilterBitMap;
        }
    
        public int getExpectConsumerNumUseFilter() {
            return expectConsumerNumUseFilter;
        }
    
        public void setExpectConsumerNumUseFilter(int expectConsumerNumUseFilter) {
            this.expectConsumerNumUseFilter = expectConsumerNumUseFilter;
        }
    
        public int getMaxErrorRateOfBloomFilter() {
            return maxErrorRateOfBloomFilter;
        }
    
        public void setMaxErrorRateOfBloomFilter(int maxErrorRateOfBloomFilter) {
            this.maxErrorRateOfBloomFilter = maxErrorRateOfBloomFilter;
        }
    
        public long getFilterDataCleanTimeSpan() {
            return filterDataCleanTimeSpan;
        }
    
        public void setFilterDataCleanTimeSpan(long filterDataCleanTimeSpan) {
            this.filterDataCleanTimeSpan = filterDataCleanTimeSpan;
        }
    
        public boolean isFilterSupportRetry() {
            return filterSupportRetry;
        }
    
        public void setFilterSupportRetry(boolean filterSupportRetry) {
            this.filterSupportRetry = filterSupportRetry;
        }
    
        public boolean isEnablePropertyFilter() {
            return enablePropertyFilter;
        }
    
        public void setEnablePropertyFilter(boolean enablePropertyFilter) {
            this.enablePropertyFilter = enablePropertyFilter;
        }
    
        public boolean isCompressedRegister() {
            return compressedRegister;
        }
    
        public void setCompressedRegister(boolean compressedRegister) {
            this.compressedRegister = compressedRegister;
        }
    
        public boolean isForceRegister() {
            return forceRegister;
        }
    
        public void setForceRegister(boolean forceRegister) {
            this.forceRegister = forceRegister;
        }
    
        public int getHeartbeatThreadPoolQueueCapacity() {
            return heartbeatThreadPoolQueueCapacity;
        }
    
        public void setHeartbeatThreadPoolQueueCapacity(int heartbeatThreadPoolQueueCapacity) {
            this.heartbeatThreadPoolQueueCapacity = heartbeatThreadPoolQueueCapacity;
        }
    
        public int getHeartbeatThreadPoolNums() {
            return heartbeatThreadPoolNums;
        }
    
        public void setHeartbeatThreadPoolNums(int heartbeatThreadPoolNums) {
            this.heartbeatThreadPoolNums = heartbeatThreadPoolNums;
        }
    
        public long getWaitTimeMillsInHeartbeatQueue() {
            return waitTimeMillsInHeartbeatQueue;
        }
    
        public void setWaitTimeMillsInHeartbeatQueue(long waitTimeMillsInHeartbeatQueue) {
            this.waitTimeMillsInHeartbeatQueue = waitTimeMillsInHeartbeatQueue;
        }
    
        public int getRegisterNameServerPeriod() {
            return registerNameServerPeriod;
        }
    
        public void setRegisterNameServerPeriod(int registerNameServerPeriod) {
            this.registerNameServerPeriod = registerNameServerPeriod;
        }
    
        public long getTransactionTimeOut() {
            return transactionTimeOut;
        }
    
        public void setTransactionTimeOut(long transactionTimeOut) {
            this.transactionTimeOut = transactionTimeOut;
        }
    
        public int getTransactionCheckMax() {
            return transactionCheckMax;
        }
    
        public void setTransactionCheckMax(int transactionCheckMax) {
            this.transactionCheckMax = transactionCheckMax;
        }
    
        public long getTransactionCheckInterval() {
            return transactionCheckInterval;
        }
    
        public void setTransactionCheckInterval(long transactionCheckInterval) {
            this.transactionCheckInterval = transactionCheckInterval;
        }
    
        public int getEndTransactionThreadPoolNums() {
            return endTransactionThreadPoolNums;
        }
    
        public void setEndTransactionThreadPoolNums(int endTransactionThreadPoolNums) {
            this.endTransactionThreadPoolNums = endTransactionThreadPoolNums;
        }
    
        public int getEndTransactionPoolQueueCapacity() {
            return endTransactionPoolQueueCapacity;
        }
    
        public void setEndTransactionPoolQueueCapacity(int endTransactionPoolQueueCapacity) {
            this.endTransactionPoolQueueCapacity = endTransactionPoolQueueCapacity;
        }
    
        public long getWaitTimeMillsInTransactionQueue() {
            return waitTimeMillsInTransactionQueue;
        }
    
        public void setWaitTimeMillsInTransactionQueue(long waitTimeMillsInTransactionQueue) {
            this.waitTimeMillsInTransactionQueue = waitTimeMillsInTransactionQueue;
        }
    
        public String getMsgTraceTopicName() {
            return msgTraceTopicName;
        }
    
        public void setMsgTraceTopicName(String msgTraceTopicName) {
            this.msgTraceTopicName = msgTraceTopicName;
        }
        
        public boolean isTraceTopicEnable() {
            return traceTopicEnable;
        }
    
        public void setTraceTopicEnable(boolean traceTopicEnable) {
            this.traceTopicEnable = traceTopicEnable;
        }
    
        public boolean isAclEnable() {
            return aclEnable;
        }
    
        public void setAclEnable(boolean aclEnable) {
            this.aclEnable = aclEnable;
        }
    }
    
    

    6、org.apache.rocketmq.remoting.netty.NettyServerConfig.java 源码:

    /*
    D:\RocketMQ\rocketmq-master\remoting\src\main\java\org\apache\rocketmq\remoting\netty\NettyServerConfig.java
    
     * Licensed to the Apache Software Foundation (ASF) under one or more
     * contributor license agreements.  See the NOTICE file distributed with
     * this work for additional information regarding copyright ownership.
     * The ASF licenses this file to You under the Apache License, Version 2.0
     * (the "License"); you may not use this file except in compliance with
     * the License.  You may obtain a copy of the License at
     *
     *     http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    package org.apache.rocketmq.remoting.netty;
    
    public class NettyServerConfig implements Cloneable {
        private int listenPort = 8888;
        private int serverWorkerThreads = 8;
        private int serverCallbackExecutorThreads = 0;
        private int serverSelectorThreads = 3;
        private int serverOnewaySemaphoreValue = 256;
        private int serverAsyncSemaphoreValue = 64;
        private int serverChannelMaxIdleTimeSeconds = 120;
    
        private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
        private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
        private boolean serverPooledByteBufAllocatorEnable = true;
    
        /**
         * make make install
         *
         *
         * ../glibc-2.10.1/configure \ --prefix=/usr \ --with-headers=/usr/include \
         * --host=x86_64-linux-gnu \ --build=x86_64-pc-linux-gnu \ --without-gd
         */
        private boolean useEpollNativeSelector = false;
    
        public int getListenPort() {
            return listenPort;
        }
    
        public void setListenPort(int listenPort) {
            this.listenPort = listenPort;
        }
    
        public int getServerWorkerThreads() {
            return serverWorkerThreads;
        }
    
        public void setServerWorkerThreads(int serverWorkerThreads) {
            this.serverWorkerThreads = serverWorkerThreads;
        }
    
        public int getServerSelectorThreads() {
            return serverSelectorThreads;
        }
    
        public void setServerSelectorThreads(int serverSelectorThreads) {
            this.serverSelectorThreads = serverSelectorThreads;
        }
    
        public int getServerOnewaySemaphoreValue() {
            return serverOnewaySemaphoreValue;
        }
    
        public void setServerOnewaySemaphoreValue(int serverOnewaySemaphoreValue) {
            this.serverOnewaySemaphoreValue = serverOnewaySemaphoreValue;
        }
    
        public int getServerCallbackExecutorThreads() {
            return serverCallbackExecutorThreads;
        }
    
        public void setServerCallbackExecutorThreads(int serverCallbackExecutorThreads) {
            this.serverCallbackExecutorThreads = serverCallbackExecutorThreads;
        }
    
        public int getServerAsyncSemaphoreValue() {
            return serverAsyncSemaphoreValue;
        }
    
        public void setServerAsyncSemaphoreValue(int serverAsyncSemaphoreValue) {
            this.serverAsyncSemaphoreValue = serverAsyncSemaphoreValue;
        }
    
        public int getServerChannelMaxIdleTimeSeconds() {
            return serverChannelMaxIdleTimeSeconds;
        }
    
        public void setServerChannelMaxIdleTimeSeconds(int serverChannelMaxIdleTimeSeconds) {
            this.serverChannelMaxIdleTimeSeconds = serverChannelMaxIdleTimeSeconds;
        }
    
        public int getServerSocketSndBufSize() {
            return serverSocketSndBufSize;
        }
    
        public void setServerSocketSndBufSize(int serverSocketSndBufSize) {
            this.serverSocketSndBufSize = serverSocketSndBufSize;
        }
    
        public int getServerSocketRcvBufSize() {
            return serverSocketRcvBufSize;
        }
    
        public void setServerSocketRcvBufSize(int serverSocketRcvBufSize) {
            this.serverSocketRcvBufSize = serverSocketRcvBufSize;
        }
    
        public boolean isServerPooledByteBufAllocatorEnable() {
            return serverPooledByteBufAllocatorEnable;
        }
    
        public void setServerPooledByteBufAllocatorEnable(boolean serverPooledByteBufAllocatorEnable) {
            this.serverPooledByteBufAllocatorEnable = serverPooledByteBufAllocatorEnable;
        }
    
        public boolean isUseEpollNativeSelector() {
            return useEpollNativeSelector;
        }
    
        public void setUseEpollNativeSelector(boolean useEpollNativeSelector) {
            this.useEpollNativeSelector = useEpollNativeSelector;
        }
    
        @Override
        public Object clone() throws CloneNotSupportedException {
            return (NettyServerConfig) super.clone();
        }
    }
    
    

    7、org.apache.rocketmq.remoting.netty.NettyClientConfig.java 源码:

    /*
    D:\RocketMQ\rocketmq-master\remoting\src\main\java\org\apache\rocketmq\remoting\netty\NettyClientConfig.java
    
     * Licensed to the Apache Software Foundation (ASF) under one or more
     * contributor license agreements.  See the NOTICE file distributed with
     * this work for additional information regarding copyright ownership.
     * The ASF licenses this file to You under the Apache License, Version 2.0
     * (the "License"); you may not use this file except in compliance with
     * the License.  You may obtain a copy of the License at
     *
     *     http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    package org.apache.rocketmq.remoting.netty;
    
    public class NettyClientConfig {
        /**
         * Worker thread number
         */
        private int clientWorkerThreads = 4;
        private int clientCallbackExecutorThreads = Runtime.getRuntime().availableProcessors();
        private int clientOnewaySemaphoreValue = NettySystemConfig.CLIENT_ONEWAY_SEMAPHORE_VALUE;
        private int clientAsyncSemaphoreValue = NettySystemConfig.CLIENT_ASYNC_SEMAPHORE_VALUE;
        private int connectTimeoutMillis = 3000;
        private long channelNotActiveInterval = 1000 * 60;
    
        /**
         * IdleStateEvent will be triggered when neither read nor write was performed for
         * the specified period of this time. Specify {@code 0} to disable
         */
        private int clientChannelMaxIdleTimeSeconds = 120;
    
        private int clientSocketSndBufSize = NettySystemConfig.socketSndbufSize;
        private int clientSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
        private boolean clientPooledByteBufAllocatorEnable = false;
        private boolean clientCloseSocketIfTimeout = false;
    
        private boolean useTLS;
    
        public boolean isClientCloseSocketIfTimeout() {
            return clientCloseSocketIfTimeout;
        }
    
        public void setClientCloseSocketIfTimeout(final boolean clientCloseSocketIfTimeout) {
            this.clientCloseSocketIfTimeout = clientCloseSocketIfTimeout;
        }
    
        public int getClientWorkerThreads() {
            return clientWorkerThreads;
        }
    
        public void setClientWorkerThreads(int clientWorkerThreads) {
            this.clientWorkerThreads = clientWorkerThreads;
        }
    
        public int getClientOnewaySemaphoreValue() {
            return clientOnewaySemaphoreValue;
        }
    
        public void setClientOnewaySemaphoreValue(int clientOnewaySemaphoreValue) {
            this.clientOnewaySemaphoreValue = clientOnewaySemaphoreValue;
        }
    
        public int getConnectTimeoutMillis() {
            return connectTimeoutMillis;
        }
    
        public void setConnectTimeoutMillis(int connectTimeoutMillis) {
            this.connectTimeoutMillis = connectTimeoutMillis;
        }
    
        public int getClientCallbackExecutorThreads() {
            return clientCallbackExecutorThreads;
        }
    
        public void setClientCallbackExecutorThreads(int clientCallbackExecutorThreads) {
            this.clientCallbackExecutorThreads = clientCallbackExecutorThreads;
        }
    
        public long getChannelNotActiveInterval() {
            return channelNotActiveInterval;
        }
    
        public void setChannelNotActiveInterval(long channelNotActiveInterval) {
            this.channelNotActiveInterval = channelNotActiveInterval;
        }
    
        public int getClientAsyncSemaphoreValue() {
            return clientAsyncSemaphoreValue;
        }
    
        public void setClientAsyncSemaphoreValue(int clientAsyncSemaphoreValue) {
            this.clientAsyncSemaphoreValue = clientAsyncSemaphoreValue;
        }
    
        public int getClientChannelMaxIdleTimeSeconds() {
            return clientChannelMaxIdleTimeSeconds;
        }
    
        public void setClientChannelMaxIdleTimeSeconds(int clientChannelMaxIdleTimeSeconds) {
            this.clientChannelMaxIdleTimeSeconds = clientChannelMaxIdleTimeSeconds;
        }
    
        public int getClientSocketSndBufSize() {
            return clientSocketSndBufSize;
        }
    
        public void setClientSocketSndBufSize(int clientSocketSndBufSize) {
            this.clientSocketSndBufSize = clientSocketSndBufSize;
        }
    
        public int getClientSocketRcvBufSize() {
            return clientSocketRcvBufSize;
        }
    
        public void setClientSocketRcvBufSize(int clientSocketRcvBufSize) {
            this.clientSocketRcvBufSize = clientSocketRcvBufSize;
        }
    
        public boolean isClientPooledByteBufAllocatorEnable() {
            return clientPooledByteBufAllocatorEnable;
        }
    
        public void setClientPooledByteBufAllocatorEnable(boolean clientPooledByteBufAllocatorEnable) {
            this.clientPooledByteBufAllocatorEnable = clientPooledByteBufAllocatorEnable;
        }
    
        public boolean isUseTLS() {
            return useTLS;
        }
    
        public void setUseTLS(boolean useTLS) {
            this.useTLS = useTLS;
        }
    }
    
    

    8、org.apache.rocketmq.store.config.MessageStoreConfig.java 源码:

    /*
    D:\RocketMQ\rocketmq-master\store\src\main\java\org\apache\rocketmq\store\config\MessageStoreConfig.java
    
     * Licensed to the Apache Software Foundation (ASF) under one or more
     * contributor license agreements.  See the NOTICE file distributed with
     * this work for additional information regarding copyright ownership.
     * The ASF licenses this file to You under the Apache License, Version 2.0
     * (the "License"); you may not use this file except in compliance with
     * the License.  You may obtain a copy of the License at
     *
     *     http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    package org.apache.rocketmq.store.config;
    
    import java.io.File;
    import org.apache.rocketmq.common.annotation.ImportantField;
    import org.apache.rocketmq.store.ConsumeQueue;
    
    public class MessageStoreConfig {
        //The root directory in which the log data is kept
        @ImportantField
        private String storePathRootDir = System.getProperty("user.home") + File.separator + "store";
    
        //The directory in which the commitlog is kept
        @ImportantField
        private String storePathCommitLog = System.getProperty("user.home") + File.separator + "store"
            + File.separator + "commitlog";
    
        // CommitLog file size,default is 1G
        private int mapedFileSizeCommitLog = 1024 * 1024 * 1024;
        // ConsumeQueue file size,default is 30W
        private int mapedFileSizeConsumeQueue = 300000 * ConsumeQueue.CQ_STORE_UNIT_SIZE;
        // enable consume queue ext
        private boolean enableConsumeQueueExt = false;
        // ConsumeQueue extend file size, 48M
        private int mappedFileSizeConsumeQueueExt = 48 * 1024 * 1024;
        // Bit count of filter bit map.
        // this will be set by pipe of calculate filter bit map.
        private int bitMapLengthConsumeQueueExt = 64;
    
        // CommitLog flush interval
        // flush data to disk
        @ImportantField
        private int flushIntervalCommitLog = 500;
    
        // Only used if TransientStorePool enabled
        // flush data to FileChannel
        @ImportantField
        private int commitIntervalCommitLog = 200;
    
        /**
         * introduced since 4.0.x. Determine whether to use mutex reentrantLock when putting message.
    * By default it is set to false indicating using spin lock when putting message. */
    private boolean useReentrantLockWhenPutMessage = false; // Whether schedule flush,default is real-time @ImportantField private boolean flushCommitLogTimed = false; // ConsumeQueue flush interval private int flushIntervalConsumeQueue = 1000; // Resource reclaim interval private int cleanResourceInterval = 10000; // CommitLog removal interval private int deleteCommitLogFilesInterval = 100; // ConsumeQueue removal interval private int deleteConsumeQueueFilesInterval = 100; private int destroyMapedFileIntervalForcibly = 1000 * 120; private int redeleteHangedFileInterval = 1000 * 120; // When to delete,default is at 4 am @ImportantField private String deleteWhen = "04"; private int diskMaxUsedSpaceRatio = 75; // The number of hours to keep a log file before deleting it (in hours) @ImportantField private int fileReservedTime = 72; // Flow control for ConsumeQueue private int putMsgIndexHightWater = 600000; // The maximum size of a single log file,default is 512K private int maxMessageSize = 1024 * 1024 * 4; // Whether check the CRC32 of the records consumed. // This ensures no on-the-wire or on-disk corruption to the messages occurred. // This check adds some overhead,so it may be disabled in cases seeking extreme performance. private boolean checkCRCOnRecover = true; // How many pages are to be flushed when flush CommitLog private int flushCommitLogLeastPages = 4; // How many pages are to be committed when commit data to file private int commitCommitLogLeastPages = 4; // Flush page size when the disk in warming state private int flushLeastPagesWhenWarmMapedFile = 1024 / 4 * 16; // How many pages are to be flushed when flush ConsumeQueue private int flushConsumeQueueLeastPages = 2; private int flushCommitLogThoroughInterval = 1000 * 10; private int commitCommitLogThoroughInterval = 200; private int flushConsumeQueueThoroughInterval = 1000 * 60; @ImportantField private int maxTransferBytesOnMessageInMemory = 1024 * 256; @ImportantField private int maxTransferCountOnMessageInMemory = 32; @ImportantField private int maxTransferBytesOnMessageInDisk = 1024 * 64; @ImportantField private int maxTransferCountOnMessageInDisk = 8; @ImportantField private int accessMessageInMemoryMaxRatio = 40; @ImportantField private boolean messageIndexEnable = true; private int maxHashSlotNum = 5000000; private int maxIndexNum = 5000000 * 4; private int maxMsgsNumBatch = 64; @ImportantField private boolean messageIndexSafe = false; private int haListenPort = 10912; private int haSendHeartbeatInterval = 1000 * 5; private int haHousekeepingInterval = 1000 * 20; private int haTransferBatchSize = 1024 * 32; @ImportantField private String haMasterAddress = null; private int haSlaveFallbehindMax = 1024 * 1024 * 256; @ImportantField private BrokerRole brokerRole = BrokerRole.ASYNC_MASTER; @ImportantField private FlushDiskType flushDiskType = FlushDiskType.ASYNC_FLUSH; private int syncFlushTimeout = 1000 * 5; private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"; private long flushDelayOffsetInterval = 1000 * 10; @ImportantField private boolean cleanFileForciblyEnable = true; private boolean warmMapedFileEnable = false; private boolean offsetCheckInSlave = false; private boolean debugLockEnable = false; private boolean duplicationEnable = false; private boolean diskFallRecorded = true; private long osPageCacheBusyTimeOutMills = 1000; private int defaultQueryMaxNum = 32; @ImportantField private boolean transientStorePoolEnable = false; private int transientStorePoolSize = 5; private boolean fastFailIfNoBufferInStorePool = false; private boolean enableDLegerCommitLog = false; private String dLegerGroup; private String dLegerPeers; private String dLegerSelfId; public boolean isDebugLockEnable() { return debugLockEnable; } public void setDebugLockEnable(final boolean debugLockEnable) { this.debugLockEnable = debugLockEnable; } public boolean isDuplicationEnable() { return duplicationEnable; } public void setDuplicationEnable(final boolean duplicationEnable) { this.duplicationEnable = duplicationEnable; } public long getOsPageCacheBusyTimeOutMills() { return osPageCacheBusyTimeOutMills; } public void setOsPageCacheBusyTimeOutMills(final long osPageCacheBusyTimeOutMills) { this.osPageCacheBusyTimeOutMills = osPageCacheBusyTimeOutMills; } public boolean isDiskFallRecorded() { return diskFallRecorded; } public void setDiskFallRecorded(final boolean diskFallRecorded) { this.diskFallRecorded = diskFallRecorded; } public boolean isWarmMapedFileEnable() { return warmMapedFileEnable; } public void setWarmMapedFileEnable(boolean warmMapedFileEnable) { this.warmMapedFileEnable = warmMapedFileEnable; } public int getMapedFileSizeCommitLog() { return mapedFileSizeCommitLog; } public void setMapedFileSizeCommitLog(int mapedFileSizeCommitLog) { this.mapedFileSizeCommitLog = mapedFileSizeCommitLog; } public int getMapedFileSizeConsumeQueue() { int factor = (int) Math.ceil(this.mapedFileSizeConsumeQueue / (ConsumeQueue.CQ_STORE_UNIT_SIZE * 1.0)); return (int) (factor * ConsumeQueue.CQ_STORE_UNIT_SIZE); } public void setMapedFileSizeConsumeQueue(int mapedFileSizeConsumeQueue) { this.mapedFileSizeConsumeQueue = mapedFileSizeConsumeQueue; } public boolean isEnableConsumeQueueExt() { return enableConsumeQueueExt; } public void setEnableConsumeQueueExt(boolean enableConsumeQueueExt) { this.enableConsumeQueueExt = enableConsumeQueueExt; } public int getMappedFileSizeConsumeQueueExt() { return mappedFileSizeConsumeQueueExt; } public void setMappedFileSizeConsumeQueueExt(int mappedFileSizeConsumeQueueExt) { this.mappedFileSizeConsumeQueueExt = mappedFileSizeConsumeQueueExt; } public int getBitMapLengthConsumeQueueExt() { return bitMapLengthConsumeQueueExt; } public void setBitMapLengthConsumeQueueExt(int bitMapLengthConsumeQueueExt) { this.bitMapLengthConsumeQueueExt = bitMapLengthConsumeQueueExt; } public int getFlushIntervalCommitLog() { return flushIntervalCommitLog; } public void setFlushIntervalCommitLog(int flushIntervalCommitLog) { this.flushIntervalCommitLog = flushIntervalCommitLog; } public int getFlushIntervalConsumeQueue() { return flushIntervalConsumeQueue; } public void setFlushIntervalConsumeQueue(int flushIntervalConsumeQueue) { this.flushIntervalConsumeQueue = flushIntervalConsumeQueue; } public int getPutMsgIndexHightWater() { return putMsgIndexHightWater; } public void setPutMsgIndexHightWater(int putMsgIndexHightWater) { this.putMsgIndexHightWater = putMsgIndexHightWater; } public int getCleanResourceInterval() { return cleanResourceInterval; } public void setCleanResourceInterval(int cleanResourceInterval) { this.cleanResourceInterval = cleanResourceInterval; } public int getMaxMessageSize() { return maxMessageSize; } public void setMaxMessageSize(int maxMessageSize) { this.maxMessageSize = maxMessageSize; } public boolean isCheckCRCOnRecover() { return checkCRCOnRecover; } public boolean getCheckCRCOnRecover() { return checkCRCOnRecover; } public void setCheckCRCOnRecover(boolean checkCRCOnRecover) { this.checkCRCOnRecover = checkCRCOnRecover; } public String getStorePathCommitLog() { return storePathCommitLog; } public void setStorePathCommitLog(String storePathCommitLog) { this.storePathCommitLog = storePathCommitLog; } public String getDeleteWhen() { return deleteWhen; } public void setDeleteWhen(String deleteWhen) { this.deleteWhen = deleteWhen; } public int getDiskMaxUsedSpaceRatio() { if (this.diskMaxUsedSpaceRatio < 10) return 10; if (this.diskMaxUsedSpaceRatio > 95) return 95; return diskMaxUsedSpaceRatio; } public void setDiskMaxUsedSpaceRatio(int diskMaxUsedSpaceRatio) { this.diskMaxUsedSpaceRatio = diskMaxUsedSpaceRatio; } public int getDeleteCommitLogFilesInterval() { return deleteCommitLogFilesInterval; } public void setDeleteCommitLogFilesInterval(int deleteCommitLogFilesInterval) { this.deleteCommitLogFilesInterval = deleteCommitLogFilesInterval; } public int getDeleteConsumeQueueFilesInterval() { return deleteConsumeQueueFilesInterval; } public void setDeleteConsumeQueueFilesInterval(int deleteConsumeQueueFilesInterval) { this.deleteConsumeQueueFilesInterval = deleteConsumeQueueFilesInterval; } public int getMaxTransferBytesOnMessageInMemory() { return maxTransferBytesOnMessageInMemory; } public void setMaxTransferBytesOnMessageInMemory(int maxTransferBytesOnMessageInMemory) { this.maxTransferBytesOnMessageInMemory = maxTransferBytesOnMessageInMemory; } public int getMaxTransferCountOnMessageInMemory() { return maxTransferCountOnMessageInMemory; } public void setMaxTransferCountOnMessageInMemory(int maxTransferCountOnMessageInMemory) { this.maxTransferCountOnMessageInMemory = maxTransferCountOnMessageInMemory; } public int getMaxTransferBytesOnMessageInDisk() { return maxTransferBytesOnMessageInDisk; } public void setMaxTransferBytesOnMessageInDisk(int maxTransferBytesOnMessageInDisk) { this.maxTransferBytesOnMessageInDisk = maxTransferBytesOnMessageInDisk; } public int getMaxTransferCountOnMessageInDisk() { return maxTransferCountOnMessageInDisk; } public void setMaxTransferCountOnMessageInDisk(int maxTransferCountOnMessageInDisk) { this.maxTransferCountOnMessageInDisk = maxTransferCountOnMessageInDisk; } public int getFlushCommitLogLeastPages() { return flushCommitLogLeastPages; } public void setFlushCommitLogLeastPages(int flushCommitLogLeastPages) { this.flushCommitLogLeastPages = flushCommitLogLeastPages; } public int getFlushConsumeQueueLeastPages() { return flushConsumeQueueLeastPages; } public void setFlushConsumeQueueLeastPages(int flushConsumeQueueLeastPages) { this.flushConsumeQueueLeastPages = flushConsumeQueueLeastPages; } public int getFlushCommitLogThoroughInterval() { return flushCommitLogThoroughInterval; } public void setFlushCommitLogThoroughInterval(int flushCommitLogThoroughInterval) { this.flushCommitLogThoroughInterval = flushCommitLogThoroughInterval; } public int getFlushConsumeQueueThoroughInterval() { return flushConsumeQueueThoroughInterval; } public void setFlushConsumeQueueThoroughInterval(int flushConsumeQueueThoroughInterval) { this.flushConsumeQueueThoroughInterval = flushConsumeQueueThoroughInterval; } public int getDestroyMapedFileIntervalForcibly() { return destroyMapedFileIntervalForcibly; } public void setDestroyMapedFileIntervalForcibly(int destroyMapedFileIntervalForcibly) { this.destroyMapedFileIntervalForcibly = destroyMapedFileIntervalForcibly; } public int getFileReservedTime() { return fileReservedTime; } public void setFileReservedTime(int fileReservedTime) { this.fileReservedTime = fileReservedTime; } public int getRedeleteHangedFileInterval() { return redeleteHangedFileInterval; } public void setRedeleteHangedFileInterval(int redeleteHangedFileInterval) { this.redeleteHangedFileInterval = redeleteHangedFileInterval; } public int getAccessMessageInMemoryMaxRatio() { return accessMessageInMemoryMaxRatio; } public void setAccessMessageInMemoryMaxRatio(int accessMessageInMemoryMaxRatio) { this.accessMessageInMemoryMaxRatio = accessMessageInMemoryMaxRatio; } public boolean isMessageIndexEnable() { return messageIndexEnable; } public void setMessageIndexEnable(boolean messageIndexEnable) { this.messageIndexEnable = messageIndexEnable; } public int getMaxHashSlotNum() { return maxHashSlotNum; } public void setMaxHashSlotNum(int maxHashSlotNum) { this.maxHashSlotNum = maxHashSlotNum; } public int getMaxIndexNum() { return maxIndexNum; } public void setMaxIndexNum(int maxIndexNum) { this.maxIndexNum = maxIndexNum; } public int getMaxMsgsNumBatch() { return maxMsgsNumBatch; } public void setMaxMsgsNumBatch(int maxMsgsNumBatch) { this.maxMsgsNumBatch = maxMsgsNumBatch; } public int getHaListenPort() { return haListenPort; } public void setHaListenPort(int haListenPort) { this.haListenPort = haListenPort; } public int getHaSendHeartbeatInterval() { return haSendHeartbeatInterval; } public void setHaSendHeartbeatInterval(int haSendHeartbeatInterval) { this.haSendHeartbeatInterval = haSendHeartbeatInterval; } public int getHaHousekeepingInterval() { return haHousekeepingInterval; } public void setHaHousekeepingInterval(int haHousekeepingInterval) { this.haHousekeepingInterval = haHousekeepingInterval; } public BrokerRole getBrokerRole() { return brokerRole; } public void setBrokerRole(BrokerRole brokerRole) { this.brokerRole = brokerRole; } public void setBrokerRole(String brokerRole) { this.brokerRole = BrokerRole.valueOf(brokerRole); } public int getHaTransferBatchSize() { return haTransferBatchSize; } public void setHaTransferBatchSize(int haTransferBatchSize) { this.haTransferBatchSize = haTransferBatchSize; } public int getHaSlaveFallbehindMax() { return haSlaveFallbehindMax; } public void setHaSlaveFallbehindMax(int haSlaveFallbehindMax) { this.haSlaveFallbehindMax = haSlaveFallbehindMax; } public FlushDiskType getFlushDiskType() { return flushDiskType; } public void setFlushDiskType(FlushDiskType flushDiskType) { this.flushDiskType = flushDiskType; } public void setFlushDiskType(String type) { this.flushDiskType = FlushDiskType.valueOf(type); } public int getSyncFlushTimeout() { return syncFlushTimeout; } public void setSyncFlushTimeout(int syncFlushTimeout) { this.syncFlushTimeout = syncFlushTimeout; } public String getHaMasterAddress() { return haMasterAddress; } public void setHaMasterAddress(String haMasterAddress) { this.haMasterAddress = haMasterAddress; } public String getMessageDelayLevel() { return messageDelayLevel; } public void setMessageDelayLevel(String messageDelayLevel) { this.messageDelayLevel = messageDelayLevel; } public long getFlushDelayOffsetInterval() { return flushDelayOffsetInterval; } public void setFlushDelayOffsetInterval(long flushDelayOffsetInterval) { this.flushDelayOffsetInterval = flushDelayOffsetInterval; } public boolean isCleanFileForciblyEnable() { return cleanFileForciblyEnable; } public void setCleanFileForciblyEnable(boolean cleanFileForciblyEnable) { this.cleanFileForciblyEnable = cleanFileForciblyEnable; } public boolean isMessageIndexSafe() { return messageIndexSafe; } public void setMessageIndexSafe(boolean messageIndexSafe) { this.messageIndexSafe = messageIndexSafe; } public boolean isFlushCommitLogTimed() { return flushCommitLogTimed; } public void setFlushCommitLogTimed(boolean flushCommitLogTimed) { this.flushCommitLogTimed = flushCommitLogTimed; } public String getStorePathRootDir() { return storePathRootDir; } public void setStorePathRootDir(String storePathRootDir) { this.storePathRootDir = storePathRootDir; } public int getFlushLeastPagesWhenWarmMapedFile() { return flushLeastPagesWhenWarmMapedFile; } public void setFlushLeastPagesWhenWarmMapedFile(int flushLeastPagesWhenWarmMapedFile) { this.flushLeastPagesWhenWarmMapedFile = flushLeastPagesWhenWarmMapedFile; } public boolean isOffsetCheckInSlave() { return offsetCheckInSlave; } public void setOffsetCheckInSlave(boolean offsetCheckInSlave) { this.offsetCheckInSlave = offsetCheckInSlave; } public int getDefaultQueryMaxNum() { return defaultQueryMaxNum; } public void setDefaultQueryMaxNum(int defaultQueryMaxNum) { this.defaultQueryMaxNum = defaultQueryMaxNum; } /** * Enable transient commitLog store pool only if transientStorePoolEnable is true and the FlushDiskType is * ASYNC_FLUSH * * @return true or false */ public boolean isTransientStorePoolEnable() { return transientStorePoolEnable && FlushDiskType.ASYNC_FLUSH == getFlushDiskType() && BrokerRole.SLAVE != getBrokerRole(); } public void setTransientStorePoolEnable(final boolean transientStorePoolEnable) { this.transientStorePoolEnable = transientStorePoolEnable; } public int getTransientStorePoolSize() { return transientStorePoolSize; } public void setTransientStorePoolSize(final int transientStorePoolSize) { this.transientStorePoolSize = transientStorePoolSize; } public int getCommitIntervalCommitLog() { return commitIntervalCommitLog; } public void setCommitIntervalCommitLog(final int commitIntervalCommitLog) { this.commitIntervalCommitLog = commitIntervalCommitLog; } public boolean isFastFailIfNoBufferInStorePool() { return fastFailIfNoBufferInStorePool; } public void setFastFailIfNoBufferInStorePool(final boolean fastFailIfNoBufferInStorePool) { this.fastFailIfNoBufferInStorePool = fastFailIfNoBufferInStorePool; } public boolean isUseReentrantLockWhenPutMessage() { return useReentrantLockWhenPutMessage; } public void setUseReentrantLockWhenPutMessage(final boolean useReentrantLockWhenPutMessage) { this.useReentrantLockWhenPutMessage = useReentrantLockWhenPutMessage; } public int getCommitCommitLogLeastPages() { return commitCommitLogLeastPages; } public void setCommitCommitLogLeastPages(final int commitCommitLogLeastPages) { this.commitCommitLogLeastPages = commitCommitLogLeastPages; } public int getCommitCommitLogThoroughInterval() { return commitCommitLogThoroughInterval; } public void setCommitCommitLogThoroughInterval(final int commitCommitLogThoroughInterval) { this.commitCommitLogThoroughInterval = commitCommitLogThoroughInterval; } public String getdLegerGroup() { return dLegerGroup; } public void setdLegerGroup(String dLegerGroup) { this.dLegerGroup = dLegerGroup; } public String getdLegerPeers() { return dLegerPeers; } public void setdLegerPeers(String dLegerPeers) { this.dLegerPeers = dLegerPeers; } public String getdLegerSelfId() { return dLegerSelfId; } public void setdLegerSelfId(String dLegerSelfId) { this.dLegerSelfId = dLegerSelfId; } public boolean isEnableDLegerCommitLog() { return enableDLegerCommitLog; } public void setEnableDLegerCommitLog(boolean enableDLegerCommitLog) { this.enableDLegerCommitLog = enableDLegerCommitLog; } }

    二、 消息中间件 RocketMQ 源码分析:路由注册之发送心跳包

    1、路由注册 示例图:

    在这里插入图片描述

    2、路由注册:发送心跳包

    RocketMQ 路由注册是通过 Broker 与 NameServer 的心跳功能实现的。Broker 启动时向集群中所有的 NameServer 发送心跳信息,每隔30s向集群中所有 NameServer 发送心跳包,NameServer 收到心跳包时会更新 brokerLiveTable 缓存中 BrokerLiveInfo 的 lastUpdataTimeStamp 信息,然后 NameServer 每隔10s扫描 brokerLiveTable,如果连续120S没有收到心跳包,NameServer 将移除 Broker 的路由信息同时关闭 Socket 连接。

    3、代码:BrokerController#start

    
    //注册Broker信息
    this.registerBrokerAll(true, false, true);
    //每隔30s上报Broker信息到NameServer
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
        @Override
        public void run() {
            try {
                BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
            } catch (Throwable e) {
                log.error("registerBrokerAll Exception", e);
            }
        }
    }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), 
                                                      TimeUnit.MILLISECONDS);
    
    

    4、代码:BrokerOuterAPI#registerBrokerAll

    
    //获得nameServer地址信息
    List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
    //遍历所有nameserver列表
    if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
    
        //封装请求头
        final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
        requestHeader.setBrokerAddr(brokerAddr);
        requestHeader.setBrokerId(brokerId);
        requestHeader.setBrokerName(brokerName);
        requestHeader.setClusterName(clusterName);
        requestHeader.setHaServerAddr(haServerAddr);
        requestHeader.setCompressed(compressed);
    	//封装请求体
        RegisterBrokerBody requestBody = new RegisterBrokerBody();
        requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
        requestBody.setFilterServerList(filterServerList);
        final byte[] body = requestBody.encode(compressed);
        final int bodyCrc32 = UtilAll.crc32(body);
        requestHeader.setBodyCrc32(bodyCrc32);
        final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
        for (final String namesrvAddr : nameServerAddressList) {
            brokerOuterExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        //分别向NameServer注册
                        RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);
                        if (result != null) {
                            registerBrokerResultList.add(result);
                        }
    
                        log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
                    } catch (Exception e) {
                        log.warn("registerBroker Exception, {}", namesrvAddr, e);
                    } finally {
                        countDownLatch.countDown();
                    }
                }
            });
        }
    
        try {
            countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
        }
    }
    

    5、代码:BrokerOutAPI#registerBroker

    
    if (oneway) {
        try {
            this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);
        } catch (RemotingTooMuchRequestException e) {
            // Ignore
        }
        return null;
    }
    RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
    

    三、 消息中间件 RocketMQ 源码分析:路由注册之处理请求包

    1、路由注册之处理请求包 示例图:

    org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor网路处理类解析请求类型,如果请求类型是为REGISTER_BROKER,则将请求转发到RouteInfoManager#regiesterBroker

    在这里插入图片描述

    2、代码:DefaultRequestProcessor#processRequest

    
    //判断是注册Broker信息
    case RequestCode.REGISTER_BROKER:
    	Version brokerVersion = MQVersion.value2Version(request.getVersion());
    	if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
    	    return this.registerBrokerWithFilterServer(ctx, request);
    	} else {
            //注册Broker信息
    	    return this.registerBroker(ctx, request);
    	}
    

    3、代码:DefaultRequestProcessor#registerBroker

    
    RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
        requestHeader.getClusterName(),
        requestHeader.getBrokerAddr(),
        requestHeader.getBrokerName(),
        requestHeader.getBrokerId(),
        requestHeader.getHaServerAddr(),
        topicConfigWrapper,
        null,
        ctx.channel()
    );
    

    4、代码:RouteInfoManager#registerBroker

    维护路由信息

    
    //加锁
    this.lock.writeLock().lockInterruptibly();
    //维护clusterAddrTable
    Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
    if (null == brokerNames) {
        brokerNames = new HashSet<String>();
        this.clusterAddrTable.put(clusterName, brokerNames);
    }
    brokerNames.add(brokerName);
    

    5、维护 brokerAddrTable

    
    //维护brokerAddrTable
    BrokerData brokerData = this.brokerAddrTable.get(brokerName);
    //第一次注册,则创建brokerData
    if (null == brokerData) {
        registerFirst = true;
        brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
        this.brokerAddrTable.put(brokerName, brokerData);
    }
    //非第一次注册,更新Broker
    Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
    Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();
    while (it.hasNext()) {
        Entry<Long, String> item = it.next();
        if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {
            it.remove();
        }
    }
    String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
    registerFirst = registerFirst || (null == oldAddr);
    

    6、维护 topicQueueTable

    
    //维护topicQueueTable
    if (null != topicConfigWrapper && MixAll.MASTER_ID == brokerId) {
        if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion()) || 
            registerFirst) {
            ConcurrentMap<String, TopicConfig> tcTable = topicConfigWrapper.getTopicConfigTable();
            if (tcTable != null) {
                for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
                    this.createAndUpdateQueueData(brokerName, entry.getValue());
                }
            }
        }
    }
    

    7、代码:RouteInfoManager#createAndUpdateQueueData

    
    private void createAndUpdateQueueData(final String brokerName, final TopicConfig topicConfig) {
        //创建QueueData
    	QueueData queueData = new QueueData();
    	queueData.setBrokerName(brokerName);
    	queueData.setWriteQueueNums(topicConfig.getWriteQueueNums());
    	queueData.setReadQueueNums(topicConfig.getReadQueueNums());
    	queueData.setPerm(topicConfig.getPerm());
    	queueData.setTopicSynFlag(topicConfig.getTopicSysFlag());
    	//获得topicQueueTable中队列集合
    	List<QueueData> queueDataList = this.topicQueueTable.get(topicConfig.getTopicName());
        //topicQueueTable为空,则直接添加queueData到队列集合
    	if (null == queueDataList) {
    	    queueDataList = new LinkedList<QueueData>();
    	    queueDataList.add(queueData);
    	    this.topicQueueTable.put(topicConfig.getTopicName(), queueDataList);
    	    log.info("new topic registered, {} {}", topicConfig.getTopicName(), queueData);
    	} else {
            //判断是否是新的队列
    	    boolean addNewOne = true;
    	    Iterator<QueueData> it = queueDataList.iterator();
    	    while (it.hasNext()) {
    	        QueueData qd = it.next();
                //如果brokerName相同,代表不是新的队列
    	        if (qd.getBrokerName().equals(brokerName)) {
    	            if (qd.equals(queueData)) {
    	                addNewOne = false;
    	        } else {
    	                    log.info("topic changed, {} OLD: {} NEW: {}", topicConfig.getTopicName(), qd,
    	                        queueData);
    	                    it.remove();
    	                }
    	            }
    	        }
    		//如果是新的队列,则添加队列到queueDataList
            if (addNewOne) {
                queueDataList.add(queueData);
            }
        }
    }
    

    8、维护 brokerLiveTable

    
    //维护brokerLiveTable
    BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,new BrokerLiveInfo(
        System.currentTimeMillis(),
        topicConfigWrapper.getDataVersion(),
        channel,
        haServerAddr));
    

    9、维护 filterServerList

    
    //维护filterServerList
    if (filterServerList != null) {
        if (filterServerList.isEmpty()) {
            this.filterServerTable.remove(brokerAddr);
        } else {
            this.filterServerTable.put(brokerAddr, filterServerList);
        }
    }
    
    if (MixAll.MASTER_ID != brokerId) {
        String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
        if (masterAddr != null) {
            BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
            if (brokerLiveInfo != null) {
                result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
                result.setMasterAddr(masterAddr);
            }
        }
    }
    

    四、 消息中间件 RocketMQ 源码分析:路由删除

    1、路由删除

    Broker每隔30s向NameServer发送一个心跳包,心跳包包含BrokerIdBroker地址,Broker名称,Broker所属集群名称、Broker关联的FilterServer列表。但是如果Broker宕机,NameServer无法收到心跳包,此时NameServer如何来剔除这些失效的Broker呢?NameServer会每隔10s扫描brokerLiveTable状态表,如果BrokerLivelastUpdateTimestamp的时间戳距当前时间超过120s,则认为Broker失效,移除该Broker,关闭与Broker连接,同时更新topicQueueTablebrokerAddrTablebrokerLiveTablefilterServerTable

    2、RocketMQ 有两个触发点来删除路由信息

    • NameServer 定期扫描 brokerLiveTable 检测上次心跳包与当前系统的时间差,如果时间超过120s,则需要移除 broker。
    • Broker 在正常关闭的情况下,会执行 unregisterBroker 指令

    这两种方式路由删除的方法都是一样的,就是从相关路由表中删除与该 broker 相关的信息。

    在这里插入图片描述

    3、代码:NamesrvController#initialize

    
    //每隔10s扫描一次为活跃Broker
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
        @Override
        public void run() {
            NamesrvController.this.routeInfoManager.scanNotActiveBroker();
        }
    }, 5, 10, TimeUnit.SECONDS);
    

    4、代码:RouteInfoManager#scanNotActiveBroker

    
    public void scanNotActiveBroker() {
        //获得brokerLiveTable
        Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
        //遍历brokerLiveTable
        while (it.hasNext()) {
            Entry<String, BrokerLiveInfo> next = it.next();
            long last = next.getValue().getLastUpdateTimestamp();
            //如果收到心跳包的时间距当时时间是否超过120s
            if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
                //关闭连接
                RemotingUtil.closeChannel(next.getValue().getChannel());
                //移除broker
                it.remove();
                //维护路由表
                this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
            }
        }
    }
    

    5、代码:RouteInfoManager#onChannelDestroy

    
    //申请写锁,根据brokerAddress从brokerLiveTable和filterServerTable移除
    this.lock.writeLock().lockInterruptibly();
    this.brokerLiveTable.remove(brokerAddrFound);
    this.filterServerTable.remove(brokerAddrFound);
    

    6、维护 brokerAddrTable

    
    //维护brokerAddrTable
    String brokerNameFound = null;
    boolean removeBrokerName = false;
    Iterator<Entry<String, BrokerData>> itBrokerAddrTable =this.brokerAddrTable.entrySet().iterator();
    //遍历brokerAddrTable
    while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) {
        BrokerData brokerData = itBrokerAddrTable.next().getValue();
        //遍历broker地址
        Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator();
        while (it.hasNext()) {
            Entry<Long, String> entry = it.next();
            Long brokerId = entry.getKey();
            String brokerAddr = entry.getValue();
            //根据broker地址移除brokerAddr
            if (brokerAddr.equals(brokerAddrFound)) {
                brokerNameFound = brokerData.getBrokerName();
                it.remove();
                log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed",
                    brokerId, brokerAddr);
                break;
            }
        }
    	//如果当前主题只包含待移除的broker,则移除该topic
        if (brokerData.getBrokerAddrs().isEmpty()) {
            removeBrokerName = true;
            itBrokerAddrTable.remove();
            log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed",
                brokerData.getBrokerName());
        }
    }
    

    7、维护 clusterAddrTable

    //维护clusterAddrTable
    if (brokerNameFound != null && removeBrokerName) {
        Iterator<Entry<String, Set<String>>> it = this.clusterAddrTable.entrySet().iterator();
        //遍历clusterAddrTable
        while (it.hasNext()) {
            Entry<String, Set<String>> entry = it.next();
            //获得集群名称
            String clusterName = entry.getKey();
            //获得集群中brokerName集合
            Set<String> brokerNames = entry.getValue();
            //从brokerNames中移除brokerNameFound
            boolean removed = brokerNames.remove(brokerNameFound);
            if (removed) {
                log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed",
                    brokerNameFound, clusterName);
    
                if (brokerNames.isEmpty()) {
                    log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster",
                        clusterName);
                    //如果集群中不包含任何broker,则移除该集群
                    it.remove();
                }
    
                break;
            }
        }
    }
    

    8、维护 topicQueueTable队列

    
    //维护topicQueueTable队列
    if (removeBrokerName) {
        //遍历topicQueueTable
        Iterator<Entry<String, List<QueueData>>> itTopicQueueTable =
            this.topicQueueTable.entrySet().iterator();
        while (itTopicQueueTable.hasNext()) {
            Entry<String, List<QueueData>> entry = itTopicQueueTable.next();
            //主题名称
            String topic = entry.getKey();
            //队列集合
            List<QueueData> queueDataList = entry.getValue();
    		//遍历该主题队列
            Iterator<QueueData> itQueueData = queueDataList.iterator();
            while (itQueueData.hasNext()) {
                //从队列中移除为活跃broker信息
                QueueData queueData = itQueueData.next();
                if (queueData.getBrokerName().equals(brokerNameFound)) {
                    itQueueData.remove();
                    log.info("remove topic[{} {}], from topicQueueTable, because channel destroyed",
                        topic, queueData);
                }
            }
    		//如果该topic的队列为空,则移除该topic
            if (queueDataList.isEmpty()) {
                itTopicQueueTable.remove();
                log.info("remove topic[{}] all queue, from topicQueueTable, because channel destroyed",
                    topic);
            }
        }
    }
    

    9、释放写锁

    //释放写锁
    finally {
        this.lock.writeLock().unlock();
    }
    

    五、 消息中间件 RocketMQ 源码分析:路由发现和小结

    1、路由发现

    RocketMQ 路由发现是非实时的,当 Topic 路由出现变化后,NameServer 不会主动推送给客户端,而是由客户端定时拉取主题最新的路由。

    2、代码:DefaultRequestProcessor#getRouteInfoByTopic

    
    public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
        final GetRouteInfoRequestHeader requestHeader =
            (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
    	//调用RouteInfoManager的方法,从路由表topicQueueTable、brokerAddrTable、filterServerTable中分别填充TopicRouteData的List、List、filterServer
        TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
    	//如果找到主题对应你的路由信息并且该主题为顺序消息,则从NameServer KVConfig中获取关于顺序消息相关的配置填充路由信息
        if (topicRouteData != null) {
            if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {
                String orderTopicConf =
                    this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
                        requestHeader.getTopic());
                topicRouteData.setOrderTopicConf(orderTopicConf);
            }
    
            byte[] content = topicRouteData.encode();
            response.setBody(content);
            response.setCode(ResponseCode.SUCCESS);
            response.setRemark(null);
            return response;
        }
    
        response.setCode(ResponseCode.TOPIC_NOT_EXIST);
        response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()
            + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
        return response;
    }
    

    3、 NameServer 小结

    在这里插入图片描述

    # 消息中间件 RocketMQ 高级功能和源码分析(四)

  • 相关阅读:
    idea的debug调试
    Bootstrap元素的边框样式和设置
    HTTP 3.0彻底放弃TCP,TCP到底做错了什么?
    Python中直接赋值、浅拷贝和深拷贝的区别
    onnxruntime android版build & 使用
    B树(B-tree)
    这些Java基础知识,诸佬们都还记得嘛(学习,复习,面试都可)
    【云原生】Kubernetes----轻量级的现代HTTP反向代理和负载均衡器之Traefik
    dubbo 教程
    金融企业Web应用什么防火墙好?
  • 原文地址:https://blog.csdn.net/qfyh_djh/article/details/139805910