• 【Elasticsearch】ES选主流程分析


    Raft协议

    Raft是分布式系统中的一种共识算法,用于在集群中选举Leader管理集群。Raft协议中有以下角色:

    Leader(领导者):集群中的领导者,负责管理集群。

    Candidate(候选者):具有竞选Leader资格的角色,如果集群需要选举Leader,节点需要先转为候选者角色才可以发起竞选。

    Follower(跟随者 ):Leader的跟随者,接收和处理来自Leader的消息,与Leader之间保持通信,如果通信超时或者其他原因导致节点与Leader之间通信失败,节点会认为集群中没有Leader,就会转为候选者发起竞选,推荐自己成为Leader。

    Raft协议中还有一个Term(任期)的概念,任期是随着选举的举行而变化,一般是单调进行递增,比如说集群中当前的任期为1,此时某个节点发现集群中没有Leader,开始发起竞选,此时任期编号就会增加为2,表示进行了新一轮的选举。一般会为Term较大的那个节点进行投票,当某个节点收到的投票数达到了Quorum,一般是集群中的节点数/2 + 1,将会被选举为Leader。

    Elasticsearch选主

    Elasticsearch在7.0版本以前采用Bully算法进行选主,7.0以后使用了Raft协议,但没有完全按照Raft协议来实现,而是做了一些调整,ES选主流程如下:

    1. 节点的初始化状态为Candidate;

    2. 启动选举任务,向探测到的集群中其他节点发送PRE_VOTE投票请求,请求中会携带节点的Term信息;

    3. 其他节点收到PRE_VOTE投票请求后,对请求进行处理:

      (1)更新自己收到过的最大的Term

      如果请求中的Term比自己的Term大并且当前节点是Leader节点,意味着当前的Leader可能已经过期,其他节点已经开始竞选Leader,所以此时当前节点需要放弃Leader的身份,重新发起选举。

      (2)根据当前节点记录的Leader信息决定是否投票给发起者,然后向发起者返回投票响应信息:

      • 如果当前节点记录的集群Leader为空,同意投票给发起者。

      • 如果当前节点记录的集群Leader不为空,但是与本次发起的节点一致,同样同意投票。

      • 如果当前节点记录的集群Leader为空,但是与本次发起的节点不同,拒绝投票给发起者。

    4. 发起者收到其他节点对PRE_VOTE投票请求的响应,判断是否得到了大多数投票,如果是进入下一步;

    5. 发起者向集群中的节点发送StartJoin请求,邀请节点加入集群,发送StartJoin请求的时候会将Term增加1,但是发起者的Term暂不更新,这与Raft协议在发起选举的时候就对Term增加的操作不一样;

    6. 其他节点收到StartJoin请求,更新自己的Term信息,处理完毕后向发起者发送JOIN请求,JOIN请求中携带了节点的Term信息

      收到StartJoin请求时,只要请求中的Term比当前节点的Term大,当前节点都会同意为发起者进行投票,这里也与Raft协议规定的每个任期内只能为一个节点进行投票不一致。

      既然节点可以多次进行投票,那么就有可能产生多个Leader,对于这种情况,Elasticsearch会选择最后那个选举成功的节点成为Leader。

    7. 发起者收到其他节点发送的JOIN请求后,会统计收到的JOIN请求个数,如果达到了大多数投票,即可成为Leader;

      发起者收到JOIN请求时也会校验自己的Term是否比JOIN请求中的Term大,在第5步中发起者并未更新自己的Term,所以首次收到JOIN请求后,Term信息会小于JOIN请求中的Term,这里发起者会模拟一个JOIN请求给自己,也就是自己为自己投一票。

    8. 发起者成为Leader;

    ES选主存在的问题
    由于每个节点可以多次进行投票,有可能出现节点竞争激烈导致一直未选出leader的问题。关于问题的解决方案可以参考以下两篇文章:
    【张超】留意Elasticsearch 7.x 可能无法选主的问题

    【Guohang Huang】腾讯 Elasticsearch 7.x 大集群选主优化

    Elasticsearch选举流程分析

    在ES启动节点的时候,会调用Coordinator的startInitialJoin方法开启选举:

    // Node
    public class Node implements Closeable {
       public Node start() throws NodeValidationException {
           // ...
           // 启动集群选举
           coordinator.startInitialJoin();
           // ...
       }
    }
    
    // Coordinator
    public class Coordinator extends AbstractLifecycleComponent implements ClusterStatePublisher {
        public void startInitialJoin() {
            synchronized (mutex) {
                // 先转为候选者
                becomeCandidate("startInitialJoin");
            }
            // 启动选举任务
            clusterBootstrapService.scheduleUnconfiguredBootstrap();
        }
    }
    

    成为候选节点

    becomeCandidate方法主要做一些Leader选举的前置工作:

    1. 判断节点的角色是否是候选者,因为Raft协议中候选者才可以发起leader选举,所以第一步需要把当前节点转为候选者节点;
    2. 初始化PreVoteCollector里面状态信息,它是一个二元组TupleDiscoveryNode记录了集群的leader节点,PreVoteResponse里面记录节点的Term信息,包括当前Term、上一次接受的Term(集群Term)和上一次接受的版本(集群版本),在投票选举的时候会用到;
    public class Coordinator extends AbstractLifecycleComponent implements ClusterStatePublisher {
    
        void becomeCandidate(String method) {
            // 判断是否持有锁
            assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
            logger.debug("{}: coordinator becoming CANDIDATE in term {} (was {}, lastKnownLeader was [{}])", method,
                getCurrentTerm(), mode, lastKnownLeader);
            // 如果不是CANDIDATE
            if (mode != Mode.CANDIDATE) {
                final Mode prevMode = mode;
                // 设置为CANDIDATE
                mode = Mode.CANDIDATE;
                cancelActivePublication("become candidate: " + method);
              
                //...
              
                // 如果之前是Leader
                if (prevMode == Mode.LEADER) {
                    // 清除Master相关信息
                    cleanMasterService();
                }
    
                // ...
            }
            // 更新PreVoteCollector里面记录的leader节点和Term信息,这里还没有选举出leader,所以传入的是null
            preVoteCollector.update(getPreVoteResponse(), null);
        }
    
        private PreVoteResponse getPreVoteResponse() {
            // 创建PreVoteResponse,记录当前Term、上一次接受的Term和上一次接受的版本
            return new PreVoteResponse(
                getCurrentTerm(),
                coordinationState.get().getLastAcceptedTerm(),
                coordinationState.get().getLastAcceptedState().version()
            );
        }
      
     
    }
    

    PreVoteCollector的二元组如下,DiscoveryNode为leader节点,PreVoteResponse记录了Term相关信息,其他节点发起选举时,返回给发起者的投票结果就是PreVoteResponse

    public class PreVoteCollector {
        // 二元组
        private volatile Tuple state; 
    
        public void update(final PreVoteResponse preVoteResponse, @Nullable final DiscoveryNode leader) {
            logger.trace("updating with preVoteResponse={}, leader={}", preVoteResponse, leader);
            // 初始化状态信息
            state = new Tuple<>(leader, preVoteResponse);
        }
    }
    

    Leader选举

    scheduleUnconfiguredBootstrap方法中,对节点是否有Master角色权限进行了判断,如果没有Master角色权限,直接返回终止选举,否则启动选举任务,获取集群中发现的节点,调用startBootstrap开始启动:

    public class ClusterBootstrapService {
        scheduleUnconfiguredBootstrap() {
            if (unconfiguredBootstrapTimeout == null) {
                return;
            }
            // Master角色权限校验
            if (transportService.getLocalNode().isMasterNode() == false) {
                return;
            }
            logger.info(
                "no discovery configuration found, will perform best-effort cluster bootstrapping after [{}] "
                    + "unless existing master is discovered",
                unconfiguredBootstrapTimeout
            );
            // 执行启动任务
            transportService.getThreadPool().scheduleUnlessShuttingDown(unconfiguredBootstrapTimeout, Names.GENERIC, new Runnable() {
                @Override
                public void run() {
                    // 获取集群中发现的节点
                    final Set discoveredNodes = getDiscoveredNodes();
                    logger.debug("performing best-effort cluster bootstrapping with {}", discoveredNodes);
                    // 启动
                    startBootstrap(discoveredNodes, emptyList());
                }
                // ...
            });
        }
    }
    

    启动选举

    startBootstrap方法中,首先判断探测到的集群节点discoveryNodes是否有Master角色权限,然后调用doBootstrap进行启动。

    doBootstrap方法中,创建了VotingConfiguration,然后调用votingConfigurationConsumer触发选举,并进行了异常捕捉,如果出现异常进行重试:

    public class ClusterBootstrapService {
        private void startBootstrap(Set discoveryNodes, List unsatisfiedRequirements) {
            // 判断发现的节点是否有Master角色权限
            assert discoveryNodes.stream().allMatch(DiscoveryNode::isMasterNode) : discoveryNodes;
            assert unsatisfiedRequirements.size() < discoveryNodes.size() : discoveryNodes + " smaller than " + unsatisfiedRequirements;
            if (bootstrappingPermitted.compareAndSet(true, false)) {
                // 启动
                doBootstrap(
                    // 创建VotingConfiguration
                    new VotingConfiguration(
                        Stream.concat(
                            discoveryNodes.stream().map(DiscoveryNode::getId),
                            unsatisfiedRequirements.stream().map(s -> BOOTSTRAP_PLACEHOLDER_PREFIX + s)
                        ).collect(Collectors.toSet())
                    )
                );
            }
        }
      
        private void doBootstrap(VotingConfiguration votingConfiguration) {
            assert transportService.getLocalNode().isMasterNode();
            try {
                // 触发投票
                votingConfigurationConsumer.accept(votingConfiguration);
            } catch (Exception e) {
                logger.warn(() -> "exception when bootstrapping with " + votingConfiguration + ", rescheduling", e);
                // 如果出现异常,进行重试
                transportService.getThreadPool().scheduleUnlessShuttingDown(TimeValue.timeValueSeconds(10), Names.GENERIC, new Runnable() {
                    @Override
                    public void run() {
                        doBootstrap(votingConfiguration);
                    }
                    // ... 
                });
            }
        }
    
    }
    

    votingConfigurationConsumer是一个函数式编程接口,它接收一个表达式,在Coordinator的构造函数中可以看到对ClusterBootstrapService进行实例化时,传入的是setInitialConfiguration方法,所以votingConfigurationConsumer.accept(votingConfiguration)会执行CoordinatorsetInitialConfiguration方法:

    public class ClusterBootstrapService {
        // votingConfigurationConsumer
        private final Consumer votingConfigurationConsumer;
      
        public ClusterBootstrapService(
            Settings settings,
            TransportService transportService,
            Supplier> discoveredNodesSupplier,
            BooleanSupplier isBootstrappedSupplier,
            Consumer votingConfigurationConsumer
        ) {
           //...
           // 设置votingConfigurationConsumer
           this.votingConfigurationConsumer = votingConfigurationConsumer;
        }
    }
    
    public class Coordinator extends AbstractLifecycleComponent implements ClusterStatePublisher {
    
        public Coordinator(
           // ...
        ) {
            // ...
            // 初始化ClusterBootstrapService
            this.clusterBootstrapService = new ClusterBootstrapService(
                settings,
                transportService,
                this::getFoundPeers,
                this::isInitialConfigurationSet,
                this::setInitialConfiguration  // 传入setInitialConfiguration方法
            );
            // ...
        }
    }
    

    setInitialConfiguration方法的处理逻辑如下:

    1. 首先进行一系列的校验,如果校验不通过不能进行选举:
      • 是否已经初始化过;
      • 当前节点是有Master角色权限;
      • 集群中的节点是否包含当前节点;
      • 集群中的节点个数是否达到了Quorum个;
    2. 调用preVoteCollector的update方法,更新当前节点记录的Leader节点和Term信息;
    3. 调用startElectionScheduler方法启动选举;
    public class Coordinator extends AbstractLifecycleComponent implements ClusterStatePublisher {
        public boolean setInitialConfiguration(final VotingConfiguration votingConfiguration) {
            synchronized (mutex) {
                // 获取集群状态
                final ClusterState currentState = getStateForMasterService();
                // 判断是否初始化过
                if (isInitialConfigurationSet()) {
                    logger.debug("initial configuration already set, ignoring {}", votingConfiguration);
                    return false;
                }
                // 校验Master角色权限
                if (getLocalNode().isMasterNode() == false) {
                    logger.debug("skip setting initial configuration as local node is not a master-eligible node");
                    throw new CoordinationStateRejectedException(
                        "this node is not master-eligible, but cluster bootstrapping can only happen on a master-eligible node"
                    );
                }
                // 如果节点ID中不包含当前节点的ID
                if (votingConfiguration.getNodeIds().contains(getLocalNode().getId()) == false) {
                    logger.debug("skip setting initial configuration as local node is not part of initial configuration");
                    throw new CoordinationStateRejectedException("local node is not part of initial configuration");
                }
                // ...
                // 判断节点个数是否达到Quorum
                if (votingConfiguration.hasQuorum(knownNodes.stream().map(DiscoveryNode::getId).toList()) == false) {
                    // ...
                    throw new CoordinationStateRejectedException(
                        "not enough nodes discovered to form a quorum in the initial configuration "
                            + "[knownNodes="
                            + knownNodes
                            + ", "
                            + votingConfiguration
                            + "]"
                    );
                }
                // ...
                // 更新
                preVoteCollector.update(getPreVoteResponse(), null); 
                // 开始选举 
                startElectionScheduler();
                return true;
            }
        }
    }
    

    发起选举

    startElectionScheduler方法用于启动选举任务,任务是异步执行的:

    1. 校验节点是否是CANDIDATE节点,如果是继续往下进行;
    2. 如果当前节点集群健康状态处于UNHEALTHY,直接返回;
    3. 调用PreVoteCollector的start方法发起投票;
    public class Coordinator extends AbstractLifecycleComponent implements ClusterStatePublisher {
       private void startElectionScheduler() {
            assert electionScheduler == null : electionScheduler;
            // 校验Master角色权限
            if (getLocalNode().isMasterNode() == false) {
                return;
            }
            final TimeValue gracePeriod = TimeValue.ZERO;
            // 启动选举任务
            electionScheduler = electionSchedulerFactory.startElectionScheduler(gracePeriod, new Runnable() {
                @Override
                public void run() {
                    synchronized (mutex) {
                        // 如果是CANDIDATE节点
                        if (mode == Mode.CANDIDATE) {
                            // 获取之前的集群状态
                            final ClusterState lastAcceptedState = coordinationState.get().getLastAcceptedState();
                            if (localNodeMayWinElection(lastAcceptedState) == false) {
                                logger.trace("skip prevoting as local node may not win election: {}", lastAcceptedState.coordinationMetadata());
                                return;
                            }
                            // 获取集群状态信息
                            final StatusInfo statusInfo = nodeHealthService.getHealth();
                            // 如果处于UNHEALTHY状态
                            if (statusInfo.getStatus() == UNHEALTHY) {
                                logger.debug("skip prevoting as local node is unhealthy: [{}]", statusInfo.getInfo());
                                return;
                            }
                            if (prevotingRound != null) {
                                prevotingRound.close();
                            }
                            // 发起投票
                            prevotingRound = preVoteCollector.start(lastAcceptedState, getDiscoveredNodes());
                        }
                    }
                }
                // ...
            });
        }
    }
    

    PreVoteCollector的start方法中,创建了PreVotingRound,然后调用PreVotingRound的start的方法发起投票:

     public class PreVoteCollector {
        public Releasable start(final ClusterState clusterState, final Iterable broadcastNodes) {
            // 创建PreVotingRound
            PreVotingRound preVotingRound = new PreVotingRound(clusterState, state.v2().getCurrentTerm());
            // 发起投票
            preVotingRound.start(broadcastNodes);
            return preVotingRound;
        }
    }
    

    发送PRE_VOTE投票请求

    PreVotingRoundPreVoteCollector的内部类,在start方法中,会遍历探测到的集群节点,然后进行遍历,向每一个节点发送PRE_VOTE投票请求,投票请求响应信息处理是在handlePreVoteResponse方法中处理的:

    public class PreVoteCollector {        
        private class PreVotingRound implements Releasable {
            PreVotingRound(final ClusterState clusterState, final long currentTerm) {
                // 集群状态
                this.clusterState = clusterState;
                // 构建投票请求
                preVoteRequest = new PreVoteRequest(transportService.getLocalNode(), currentTerm);
            }
    
            void start(final Iterable broadcastNodes) {
                logger.debug("{} requesting pre-votes from {}", this, broadcastNodes);
                // 遍历发现的节点,当前节点向每一个节点发送投票请求
                broadcastNodes.forEach(
                    // 发送PRE_VOTE请求
                    n -> transportService.sendRequest(
                        n,
                        REQUEST_PRE_VOTE_ACTION_NAME,
                        preVoteRequest,
                        new TransportResponseHandler() {
                            // ...
                            @Override
                            public void handleResponse(PreVoteResponse response) {
                                // 处理返回的响应
                                handlePreVoteResponse(response, n);
                            }
                            // ...
                        }
                    )
                );
            }
         }
    }
    

    节点对PRE_VOTE投票请求的处理

    PreVoteCollector的构造函数中可以看到,注册了REQUEST_PRE_VOTE_ACTION_NAME请求处理器,对PRE_VOTE请求的处理是调用handlePreVoteRequest方法进行的,处理完毕后调用sendResponse返回响应信息:

    public class PreVoteCollector {
        // 选举任务
        private final Runnable startElection;
        // 更新最大Term
        private final LongConsumer updateMaxTermSeen;
      
        PreVoteCollector(
            final TransportService transportService,
            final Runnable startElection,
            final LongConsumer updateMaxTermSeen,
            final ElectionStrategy electionStrategy,
            NodeHealthService nodeHealthService
        ) {
            this.transportService = transportService;
            this.startElection = startElection;
            this.updateMaxTermSeen = updateMaxTermSeen;
            this.electionStrategy = electionStrategy;
            this.nodeHealthService = nodeHealthService;
            // 注册PRE_VOTE请求处理器
            transportService.registerRequestHandler(
                REQUEST_PRE_VOTE_ACTION_NAME,
                Names.CLUSTER_COORDINATION,
                false,
                false,
                PreVoteRequest::new,
                (request, channel, task) -> channel.sendResponse(handlePreVoteRequest(request)) // 调用handlePreVoteRequest处理请求
            );
        }
    }
    

    handlePreVoteRequest之前,首先看Coordinator的构造函数对PreVoteCollector实例化时传入的参数,主要关注startElectionupdateMaxTermSeen,它们都是函数式编程接口,从实例化的代码中可以看到分别对应Coordinator的传入的startElectionupdateMaxTermSeen方法,在后面会用到这两个方法:

    public class Coordinator extends AbstractLifecycleComponent implements ClusterStatePublisher { 
       public Coordinator(
       // ...
       ){
           // ...
           this.preVoteCollector = new PreVoteCollector(
                transportService,
                this::startElection, // 传入startElection方法,启动选举
                this::updateMaxTermSeen, // 传入updateMaxTermSeen,更新收到的最大Term
                electionStrategy,
                nodeHealthService
            );
            // ...
       }
    }
    

    handlePreVoteRequest方法处理逻辑如下:

    1. 对term进行比较,调用updateMaxTermSeen.accept()更新收到的最大Term;
    2. 获取当前节点记录的集群Leader节点和Term信息;
    3. 如果Leader节点为空,表示还没有Leader节点,返回响应同意发起投票的节点成为leader;
    4. 如果leader不为空,但是与发起请求的节点是同一个节点,同样支持发起请求的节点成为leader;
    5. 其他情况,表示已经存在leader,拒绝投票请求
    public class PreVoteCollector {
        
        private PreVoteResponse handlePreVoteRequest(final PreVoteRequest request) {
            // 比较Term,更新maxTermSeen
            updateMaxTermSeen.accept(request.getCurrentTerm());
            Tuple state = this.state;
            assert state != null : "received pre-vote request before fully initialised";
            // 获取当前节点记录的集群Leader节点
            final DiscoveryNode leader = state.v1();
            // 获取当前节点的Term信息
            final PreVoteResponse response = state.v2();
            // 获取健康状态
            final StatusInfo statusInfo = nodeHealthService.getHealth();
            // 如果当前节点的状态处于UNHEALTHY
            if (statusInfo.getStatus() == UNHEALTHY) {
                String message = "rejecting " + request + " on unhealthy node: [" + statusInfo.getInfo() + "]";
                logger.debug(message);
                throw new NodeHealthCheckFailureException(message);
            }
            // 如果leader为空,表示还没有Leader节点,返回响应同意发起投票的节点成为leader
            if (leader == null) {
                return response;
            }
            // 如果leader不为空,但是与发起请求的节点是同一个节点,同样支持发起请求的节点成为leader
            if (leader.equals(request.getSourceNode())) {
                return response;
            }
            // 其他情况,表示已经存在leader,拒绝投票请求
            throw new CoordinationStateRejectedException("rejecting " + request + " as there is already a leader");
        }
    }
    
    updateMaxTermSeen

    上面说过updateMaxTermSeen指向CoordinatorupdateMaxTermSeen方法,处理逻辑如下:

    1. 比较当前节点收到过的最大的Term与请求中的Term,选择较大的那个作为maxTermSeen的值进行更新;
    2. 如果当前节点是Leader并且请求中的Term大于当前节点的Term,表示当前节点的信息可能已经过期,需要放弃当前的Leader角色,重新发起选举
      • 调用ensureTermAtLeast检查Term,确保是最新的Term,在ensureTermAtLeast方法中会判断,如果当前节点Term小于请求中的Term将当前节点转为候选者;
      • 调用startElection方法重新进行选举;
    public class Coordinator extends AbstractLifecycleComponent implements ClusterStatePublisher {
        private void updateMaxTermSeen(final long term) {
            synchronized (mutex) {
                // 当前节点收到过的最大的Term与请求中的term,如果请求中的Term较大,maxTermSeen的值将被更新为请求中的Term的值
                maxTermSeen = Math.max(maxTermSeen, term);
                // 获取当前节点的term
                final long currentTerm = getCurrentTerm();
                // 如果当前节点是Leader并且maxTermSeen大于当前节点的Term,请求中的Term较大,这里maxTermSeen的值就是请求中的Term,所以也是在比较请求中的Term是否大于当前节点的Term
                if (mode == Mode.LEADER && maxTermSeen > currentTerm) {
                    if (publicationInProgress()) {
                        logger.debug("updateMaxTermSeen: maxTermSeen = {} > currentTerm = {}, enqueueing term bump", maxTermSeen, currentTerm);
                    } else {
                        try {
                            logger.debug("updateMaxTermSeen: maxTermSeen = {} > currentTerm = {}, bumping term", maxTermSeen, currentTerm);
                            // 确保Term是最新
                            ensureTermAtLeast(getLocalNode(), maxTermSeen);
                            // 发起选举
                            startElection();
                        } catch (Exception e) {
                            logger.warn(new ParameterizedMessage("failed to bump term to {}", maxTermSeen), e);
                            becomeCandidate("updateMaxTermSeen");
                        }
                    }
                }
            }
        }
    }
    
    ensureTermAtLeast

    ensureTermAtLeast方法中,判断当前节点的Term是否小于请求中的Term:

    • 如果是则创建StartJoinRequest然后调用joinLeaderInTerm方法,joinLeaderInTerm方法会返回一个JOIN信息;

      在集群选举Leader的时候,某个节点成为Leader之前,会向其他节点发送StartJoin请求,这里进行模拟发送,当前节点向自己发送一个StartJoinRequest进行处理,更新当前节点的Term信息,后面会详细讲解StartJoin请求的处理。

    • 如果不是,返回一个空的JOIN信息;

    joinLeaderInTerm方法中,会调用handleStartJoin处理StartJoin请求,它会更新当前节点Term信息为最新,之后判断当前节点是否是CANDIDATE,如果不是需要将节点转为CANDIDATE:

    public class Coordinator extends AbstractLifecycleComponent implements ClusterStatePublisher {
        private Optional ensureTermAtLeast(DiscoveryNode sourceNode, long targetTerm) {
            assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
            // 判断当前节点Term是否小于请求中的Term
            if (getCurrentTerm() < targetTerm) {
                // 调用joinLeaderInTerm
                return Optional.of(joinLeaderInTerm(new StartJoinRequest(sourceNode, targetTerm)));
            }
            return Optional.empty();
        }
      
        private Join joinLeaderInTerm(StartJoinRequest startJoinRequest) {
            synchronized (mutex) {
                logger.debug("joinLeaderInTerm: for [{}] with term {}", startJoinRequest.getSourceNode(), startJoinRequest.getTerm());
                final Join join = coordinationState.get().handleStartJoin(startJoinRequest);
                lastJoin = Optional.of(join);
                peerFinder.setCurrentTerm(getCurrentTerm());
                // 如果不是CANDIDATE转为CANDIDATE
                if (mode != Mode.CANDIDATE) {
                    becomeCandidate("joinLeaderInTerm");
                    followersChecker.updateFastResponseState(getCurrentTerm(), mode);
                    preVoteCollector.update(getPreVoteResponse(), null);
                }
                return join;
            }
        }
    }
    

    PRE_VOTE响应处理

    发起者收到集群节点返回的PRE_VOTE请求响应时,在handlePreVoteResponse方法中进行处理:

    1. 同样调用updateMaxTermSeen更新当前节点收到的最大Term;
    2. 如果响应中的Term大于当前节点的Term, 或者Term相等但是版本号大于当前节点的版本号,直接返回不进行处理,否则进入下一步;
    3. 走到这里表示认同当前节点成为Leader节点,将得到的投票信息放入preVotesReceived
    4. 判断是否得到了大多数投票,也就是收到的投票数是否超过了Quorum,如果未超过直接返回,如果超过表示当前节点可以成为Leader;
    5. 通过startElection开始处理成为Leader前的操作;
    public class PreVoteCollector { 
        private class PreVotingRound implements Releasable {      
            private void handlePreVoteResponse(final PreVoteResponse response, final DiscoveryNode sender) {
                if (isClosed.get()) {
                    logger.debug("{} is closed, ignoring {} from {}", this, response, sender);
                    return;
                }
                // 处理最大Term
                updateMaxTermSeen.accept(response.getCurrentTerm());
                // 如果响应中的Term大于当前节点的Term, 或者Term相等但是版本号大于当前节点的版本号
                if (response.getLastAcceptedTerm() > clusterState.term()
                    || (response.getLastAcceptedTerm() == clusterState.term() && response.getLastAcceptedVersion() > clusterState.version())) {
                    logger.debug("{} ignoring {} from {} as it is fresher", this, response, sender);
                    return;
                }
                // 记录得到的投票
                preVotesReceived.put(sender, response);
                // ...
                // 判断是否得到了大多数投票
                if (electionStrategy.isElectionQuorum(
                    clusterState.nodes().getLocalNode(),
                    localPreVoteResponse.getCurrentTerm(),
                    localPreVoteResponse.getLastAcceptedTerm(),
                    localPreVoteResponse.getLastAcceptedVersion(),
                    clusterState.getLastCommittedConfiguration(),
                    clusterState.getLastAcceptedConfiguration(),
                    voteCollection
                ) == false) {
                    logger.debug("{} added {} from {}, no quorum yet", this, response, sender);
                    return;
                }
                // ...
                // 开始选举
                startElection.run();
            }
        }
    }
    

    成为Leader

    邀请节点加入集群

    在成为Leader前,需要向集群中的节点发送StartJoin请求,邀请节点加入集群:

    1. 创建StartJoin请求,请求中设置了Term信息,取当前节点的Term和收到过最大的Term中较大的那个值并加1
    2. 调用sendStartJoinRequest发送StartJoin请求;
    public class Coordinator extends AbstractLifecycleComponent implements ClusterStatePublisher { 
       private void startElection() {
            synchronized (mutex) {
                // 是否是CANDIDATE
                if (mode == Mode.CANDIDATE) {
                    if (localNodeMayWinElection(getLastAcceptedState()) == false) {
                        logger.trace("skip election as local node may not win it: {}", getLastAcceptedState().coordinationMetadata());
                        return;
                    }
                    // 创建StartJoin请求,这里可以看到在请求中的Term,设置为最大Term + 1
                    final StartJoinRequest startJoinRequest = new StartJoinRequest(getLocalNode(), Math.max(getCurrentTerm(), maxTermSeen) + 1);
                    logger.debug("starting election with {}", startJoinRequest);
                    // 调用sendStartJoinRequest发送StartJoin请求
                    getDiscoveredNodes().forEach(node -> joinHelper.sendStartJoinRequest(startJoinRequest, node));
                }
            }
       }
    }
    
    StartJoin请求发送

    StartJoin请求表示邀请节点加入集群信息,接收者收到请求后会向发起者发送JOIN请求表示进行加入,所以发起者对StartJoin的响应不需要做什么处理,等待接收者发送JOIN请求即可:

    public class JoinHelper {
        void sendStartJoinRequest(final StartJoinRequest startJoinRequest, final DiscoveryNode destination) {
            assert startJoinRequest.getSourceNode().isMasterNode()
                : "sending start-join request for master-ineligible " + startJoinRequest.getSourceNode();
            // 发送START_JOIN请求
            transportService.sendRequest(destination, START_JOIN_ACTION_NAME, startJoinRequest, new TransportResponseHandler.Empty() {
                @Override
                public void handleResponse(TransportResponse.Empty response) {
                    // 什么也不处理
                    logger.debug("successful response to {} from {}", startJoinRequest, destination);
                }
    
                @Override
                public void handleException(TransportException exp) {
                    logger.debug(new ParameterizedMessage("failure in response to {} from {}", startJoinRequest, destination), exp);
                }
            });
        }
    }
    
    StartJoin请求处理

    JoinHelper的构造函数中,注册了START_JOIN请求处理器,在收到START_JOIN请求时,会调用joinLeaderInTerm处理,然后调用sendJoinRequest向发送者发送JOIN请求:

    public class JoinHelper {
        JoinHelper(
           // ...
        ) {
            // 注册START_JOIN_ACTION_NAME请求处理
            transportService.registerRequestHandler(
                START_JOIN_ACTION_NAME,
                Names.CLUSTER_COORDINATION,
                false,
                false,
                StartJoinRequest::new,
                (request, channel, task) -> {
                    final DiscoveryNode destination = request.getSourceNode();
                    // 发送join请求
                    sendJoinRequest(destination, currentTermSupplier.getAsLong(), Optional.of(joinLeaderInTerm.apply(request))); // 调用joinLeaderInTerm处理
                    channel.sendResponse(Empty.INSTANCE);
                }
            );
        }
    }
    
    joinLeaderInTerm

    joinLeaderInTerm方法用于处理StartJoin请求,返回一个Join对象并发送给发起者,发起者会根据返回的Join信息计算得到的票数,以此决定是否成为LeaderjoinLeaderInTerm处理逻辑如下:

    1. 调用handleStartJoin处理StartJoin请求,它会从请求中获取Term信息并更新到当前节点的CurrentTerm中,并返回Join对象,用于向发起者回复投票结果;
    2. 如果节点不是CANDIDATE,将节点转为CANDIDATE
    public class Coordinator extends AbstractLifecycleComponent implements ClusterStatePublisher {
    
        private Join joinLeaderInTerm(StartJoinRequest startJoinRequest) {
            synchronized (mutex) {
                logger.debug("joinLeaderInTerm: for [{}] with term {}", startJoinRequest.getSourceNode(), startJoinRequest.getTerm());
                // 处理StartJoin请求
                final Join join = coordinationState.get().handleStartJoin(startJoinRequest);
                lastJoin = Optional.of(join);
                peerFinder.setCurrentTerm(getCurrentTerm());
                // 如果节点不是CANDIDATE,转为CANDIDATE
                if (mode != Mode.CANDIDATE) {
                    becomeCandidate("joinLeaderInTerm"); 
                } else {
                    followersChecker.updateFastResponseState(getCurrentTerm(), mode);
                    preVoteCollector.update(getPreVoteResponse(), null);
                }
                return join;
            }
        }
    }
    
    更新CurrentTerm

    在handleStartJoin方法中从请求中获取Term信息并更新到当前节点的CurrentTerm中

    1. 如果StartJoin请求中的Term小于或者等于当前节点的Term,抛出异常;
    2. 更新当前节点的CurrentTerm为StartJoin请求中的Term
    3. 返回一个Join对象,里面记录当前节点加入集群的信息,包括当前节点信息、发送startJoin请求的节点(选举为Leader的节点),当前节点的Term,当前节点上一次接受的Term、当前节点上一次接受的版本

    handleStartJoin方法中只要请求中的Term大于当前节点的Term,都会继续往下进行,最后返回一个Join对象,这意味着当前节点同意为发起者进行投票,也就是说Elasticsearch允许一个节点多次进行投票,并没有按照Raft协议中的规定每个任期内只能给一个节点投票。

    public class CoordinationState {
      
        public Join handleStartJoin(StartJoinRequest startJoinRequest) {
            // 如果StartJoin请求中的Term小于或者等于当前节点的Term,抛出异常
            if (startJoinRequest.getTerm() <= getCurrentTerm()) {
                logger.debug(
                    "handleStartJoin: ignoring [{}] as term provided is not greater than current term [{}]",
                    startJoinRequest,
                    getCurrentTerm()
                );
                throw new CoordinationStateRejectedException(
                    "incoming term " + startJoinRequest.getTerm() + " not greater than current term " + getCurrentTerm()
                );
            }
            logger.debug("handleStartJoin: leaving term [{}] due to {}", getCurrentTerm(), startJoinRequest);
            // ...
            // 更新当前节点的CurrentTerm
            persistedState.setCurrentTerm(startJoinRequest.getTerm());
            // 判断当前节点的Term是否与startJoin请求的一致
            assert getCurrentTerm() == startJoinRequest.getTerm();
            lastPublishedVersion = 0;
            lastPublishedConfiguration = getLastAcceptedConfiguration();
            startedJoinSinceLastReboot = true;
            electionWon = false;
            joinVotes = new VoteCollection();
            publishVotes = new VoteCollection();
            // 返回JOIN信息,包括当前节点、发送startJoin请求的节点、当前节点的Term、当前节点上一次接受的Term、当前节点上一次接受的版本
            return new Join(localNode, startJoinRequest.getSourceNode(), getCurrentTerm(), getLastAcceptedTerm(), getLastAcceptedVersion());
        }
    }
    

    节点加入集群

    向Leader发送JOIN请求

    StartJoin请求处理完毕后调用sendJoinRequest向发起者发送JOIN请求,表示加入集群:

    public class JoinHelper {
        public void sendJoinRequest(DiscoveryNode destination, long term, Optional optionalJoin) {
            assert destination.isMasterNode() : "trying to join master-ineligible " + destination;
            final StatusInfo statusInfo = nodeHealthService.getHealth();
            // 如果处于UNHEALTHY状态不进行发送
            if (statusInfo.getStatus() == UNHEALTHY) {
                logger.debug("dropping join request to [{}]: [{}]", destination, statusInfo.getInfo());
                return;
            }
            // 构建JOIN请求体
            final JoinRequest joinRequest = new JoinRequest(transportService.getLocalNode(), term, optionalJoin);
            // ...
            if (pendingOutgoingJoins.putIfAbsent(dedupKey, pendingJoinInfo) == null) {
                logger.debug("attempting to join {} with {}", destination, joinRequest);
                pendingJoinInfo.message = PENDING_JOIN_CONNECTING;
                // 连接节点
                transportService.connectToNode(destination, new ActionListener<>() {
                    @Override
                    public void onResponse(Releasable connectionReference) {
                        // ...
                        clusterApplier.onNewClusterState(
                            "joining " + destination.descriptionWithoutAttributes(),
                            () -> null,
                            new ActionListener<>() {
                                @Override
                                public void onResponse(Void unused) {
                                    // ....
                                    pendingJoinInfo.message = PENDING_JOIN_WAITING_RESPONSE;
                                    // 发送JOIN请求
                                    transportService.sendRequest(
                                        destination,
                                        JOIN_ACTION_NAME,
                                        joinRequest,
                                        TransportRequestOptions.of(null, TransportRequestOptions.Type.PING),
                                        new TransportResponseHandler.Empty() {
                                            @Override
                                            public void handleResponse(TransportResponse.Empty response) {
                                                pendingJoinInfo.message = PENDING_JOIN_WAITING_STATE;
                                                pendingOutgoingJoins.remove(dedupKey);
                                                logger.debug("successfully joined {} with {}", destination, joinRequest);
                                                lastFailedJoinAttempt.set(null);
                                            }
                                            // ...
                                        }
                                    );
                                }
    
                                // ...
                            }
                        );
                    }
                    // ...
                });
    
            } else {
                logger.debug("already attempting to join {} with request {}, not sending request", destination, joinRequest);
            }
        }
    }
    
    Leader处理Join请求

    JoinHelper的构造函数中,注册了JOIN请求处理器,是通过joinHandler来处理请求的,它同样是函数式编程接口,在Coordinator对JoinHelper进行实例化的时候,可以看到传入的是handleJoinRequest方法:

    public class JoinHelper {
        JoinHelper(
          // ...
          BiConsumer> joinHandler,
          // ...
        ) {
            // ...
            transportService.registerRequestHandler(
                JOIN_ACTION_NAME,
                Names.CLUSTER_COORDINATION,
                false,
                false,
                JoinRequest::new,
                (request, channel, task) -> joinHandler.accept(
                    request,
                    new ChannelActionListener(channel, JOIN_ACTION_NAME, request).map(ignored -> Empty.INSTANCE)
                )
            );
            // ...
        }
    }
    // Coordinator
    public class Coordinator extends AbstractLifecycleComponent implements ClusterStatePublisher {
        public Coordinator(
           // ...
        ) {
            // ...
            this.joinHelper = new JoinHelper(
                allocationService,
                masterService,
                clusterApplier,
                transportService,
                this::getCurrentTerm,
                this::handleJoinRequest, // handleJoinRequest方法
                // ...
            );
            // ...
        }
    }
    

    Coordinator的handleJoinRequest方法中,会对发送JOIN的节点进行连接,进行JOIN请求验证:

    1. 先调用processJoinRequest处理收到的JOIN请求;
    2. 调用validateJoinRequest方法对JOIN请求进行验证;
    public class Coordinator extends AbstractLifecycleComponent implements ClusterStatePublisher {
        private void handleJoinRequest(JoinRequest joinRequest, ActionListener joinListener) {
            // ...
            // 连接节点
            transportService.connectToNode(joinRequest.getSourceNode(), new ActionListener<>() {
                @Override
                public void onResponse(Releasable response) {
                    boolean retainConnection = false;
                    try {
                        // 对JOIN请求进行验证
                        validateJoinRequest(
                            joinRequest,
                            ActionListener.runBefore(joinListener, () -> Releasables.close(response))
                                .delegateFailure((l, ignored) -> processJoinRequest(joinRequest, l)) // 处理请求
                        );
                        retainConnection = true;
                    } catch (Exception e) {
                        joinListener.onFailure(e);
                    } finally {
                        if (retainConnection == false) {
                            Releasables.close(response);
                        }
                    }
                }
               // ...
            });
        }
    }
    
    JOIN请求处理

    processJoinRequest处理逻辑如下:

    1. 调用updateMaxTermSeen更新收到最大的Term;
    2. 判断是否已经成功竞选为Leader,因为发起者会收到多个节点发送的JOIN请求,每次处理JOIN请求会判断是否获取了大多数投票,并将结果更新到CoordinationStateelectionWon变量中,为了不重复调用becomeLeader,这里先获取最近一次更新的值,记为prevElectionWon,用于判断后面是否需要调用becomeLeader成为Leader;
    3. 调用handleJoin进行处理,处理的时候会判断是否获取了大多数的投票,并更新CoordinationStateelectionWon的值;
    4. 再次从CoordinationState中获取electionWon值进行判断,如果prevElectionWon为false但是当前的electionWon为true,也就是之前未收到大多数投票的,但是处理当前的JOIN请求时达到了大多数投票,成功竞选为Leader,则调用becomeLeader成为Leader;
    public class Coordinator extends AbstractLifecycleComponent implements ClusterStatePublisher {
        private void processJoinRequest(JoinRequest joinRequest, ActionListener joinListener) {
            assert Transports.assertNotTransportThread("blocking on coordinator mutex and maybe doing IO to increase term");
            // 获取JOIN信息
            final Optional optionalJoin = joinRequest.getOptionalJoin();
            try {
                synchronized (mutex) {
                    // 更新最大Term
                    updateMaxTermSeen(joinRequest.getTerm());
                    // 获取集群协调状态
                    final CoordinationState coordState = coordinationState.get();
                    // 获取上一次的状态,是否成功选举为Leader
                    final boolean prevElectionWon = coordState.electionWon();
                    // 处理JOIN
                    optionalJoin.ifPresent(this::handleJoin);
                    joinAccumulator.handleJoinRequest(joinRequest.getSourceNode(), joinListener);
                    // 如果之前未成为Leader并且当前选举Leader成功
                    if (prevElectionWon == false && coordState.electionWon()) {
                        // 成为Leader
                        becomeLeader();
                    }
                }
            } catch (Exception e) {
                joinListener.onFailure(e);
            }
        }
    }
    

    接下来看下handleJoin的处理过程:

    1. 首先调用ensureTermAtLeast方法确保当前节点是最新的Term,ensureTermAtLeast前面已经讲过,会确保当前的节点Term是最新,如果已经是最新什么也不做,如果不是将创建StartJoinRequest然后调用joinLeaderInTerm方法,joinLeaderInTerm方法会返回一个JOIN信息,表示当前节点要加入一个集群的信息;

      在节点发送StartJoin请求时可知,对请求中的Term进行了加1但是节点自己的Term并未更新,所以首次收到发回的JOIN请求进入handleJoin时,JOIN请求中的Term会比当前节点的Term大1,那么ensureTermAtLeast就会返回一个JOIN信息,然后再次调用handleJoin处理JOIN请求,这里可以理解为节点向自己发了一个JOIN请求(通过创建JOIN对象的方式),给自己投一票

    2. 上面说过CoordinationStateelectionWon记录了是否已经选举为Leader,所以这里进行判断,如果已经被选举成为了Leader,调用handleJoinIgnoringExceptions处理JOIN请求,这个方法底层还是调用CoordinationStatehandleJoin进行处理,只不过在外层进行了异常捕捉,会忽略抛出的异常,因为节点之前已经成功选举了Leader,所以本次JION请求处理无关紧要,为了不让异常影响后续的流程,所以对异常进行一个捕捉;

    3. 如果还未成功选举为Leader,调用CoordinationStatehandleJoin处理请求,与第一步不一样的是这个不会对异常进行捕捉,因为此时还没成为Leader,如果有异常信息需要抛出;

    public class Coordinator extends AbstractLifecycleComponent implements ClusterStatePublisher {
        // 获取CoordinationState
        private final SetOnce coordinationState = new SetOnce<>(); 
        private void handleJoin(Join join) {
            synchronized (mutex) {
                // 确保Term最新,如果不是最新,会返回一个JOIN对象,调用handleJoin进行处理,这里可以理解为节点给自己投了一票
                ensureTermAtLeast(getLocalNode(), join.getTerm()).ifPresent(this::handleJoin);
                // 如果已经被选举为Leader
                if (coordinationState.get().electionWon()) {
                    // 调用对异常进行捕捉的handleJoin方法
                    final boolean isNewJoinFromMasterEligibleNode = handleJoinIgnoringExceptions(join);
                    final boolean establishedAsMaster = mode == Mode.LEADER && getLastAcceptedState().term() == getCurrentTerm();
                    if (isNewJoinFromMasterEligibleNode && establishedAsMaster && publicationInProgress() == false) {
                        scheduleReconfigurationIfNeeded();
                    }
                } else { // 如果还未为成为Leader
                    // CoordinationState的handleJoin处理请求
                    coordinationState.get().handleJoin(join); 
                }
            }
        }
      
        private boolean handleJoinIgnoringExceptions(Join join) {
            try {
                // CoordinationState的handleJoin处理请求
                return coordinationState.get().handleJoin(join);
            } catch (CoordinationStateRejectedException e) {
                logger.debug(() -> "failed to add " + join + " - ignoring", e);
                return false;
            }
        }
    }
    

    在CoordinationState的handleJoin中,首先会对Term和版本信息进行一系列的校验,如果校验通过,记录收到的JOIN请求个数,表示当前已经成功收到的投票数,然后调用isElectionQuorum判断是否获得了大多数的投票,也就是获得的投票数达到了Quorum,并将值更新到electionWon中:

    public class CoordinationState { 
        public boolean handleJoin(Join join) {
            assert join.targetMatches(localNode) : "handling join " + join + " for the wrong node " + localNode;
            // 如果收到的JOIN请求Term与当前节点的Term不一致抛出异常
            if (join.getTerm() != getCurrentTerm()) {
                logger.debug("handleJoin: ignored join due to term mismatch (expected: [{}], actual: [{}])", getCurrentTerm(), join.getTerm());
                throw new CoordinationStateRejectedException(
                    "incoming term " + join.getTerm() + " does not match current term " + getCurrentTerm()
                );
            }
            // ...
            // 获取上一次的Term
            final long lastAcceptedTerm = getLastAcceptedTerm();
            // 如果请求中的上一次接受的Term大于当前节点的lastAcceptedTerm,抛出异常
            if (join.getLastAcceptedTerm() > lastAcceptedTerm) {
                logger.debug( "handleJoin: ignored join as joiner has a better last accepted term (expected: <=[{}], actual: [{}])", lastAcceptedTerm, join.getLastAcceptedTerm());
                throw new CoordinationStateRejectedException( "incoming last accepted term "
                        + join.getLastAcceptedTerm() + " of join higher than current last accepted term "
                        + lastAcceptedTerm
                );
            }
            // 对比版本
            if (join.getLastAcceptedTerm() == lastAcceptedTerm && join.getLastAcceptedVersion() > getLastAcceptedVersion()) {
                logger.debug("handleJoin: ignored join as joiner has a better last accepted version (expected: <=[{}], actual: [{}]) in term {}", getLastAcceptedVersion(), join.getLastAcceptedVersion(), lastAcceptedTerm);
                throw new CoordinationStateRejectedException("incoming last accepted version "
                        + join.getLastAcceptedVersion() + " of join higher than current last accepted version "
                        + getLastAcceptedVersion() + " in term " + lastAcceptedTerm);
            }
            // ...
            // 记录JOIN投票
            boolean added = joinVotes.addJoinVote(join);
            boolean prevElectionWon = electionWon;
            // 判断是否得到了大多数投票,这里会更新electionWon的值
            electionWon = isElectionQuorum(joinVotes);
            assert prevElectionWon == false || electionWon : // we cannot go from won to not won
                "locaNode= " + localNode + ", join=" + join + ", joinVotes=" + joinVotes;
            logger.debug(
                "handleJoin: added join {} from [{}] for election, electionWon={} lastAcceptedTerm={} lastAcceptedVersion={}", join, join.getSourceNode(), electionWon, lastAcceptedTerm, getLastAcceptedVersion()
            );
            // 如果得到了大多数投票并且上一次没有选举为Leader
            if (electionWon && prevElectionWon == false) {
                logger.debug("handleJoin: election won in term [{}] with {}", getCurrentTerm(), joinVotes;
                lastPublishedVersion = getLastAcceptedVersion();
            }
            return added;
        }
    }
    

    转为Leader

    当节点收到了大多数投票后,就会调用becomeLeader转为Leader,这里会将节点由CANDIDATE转为LEADER角色,然后调用preVoteCollector的update更新Term和Leader节点信息:

    public class Coordinator extends AbstractLifecycleComponent implements ClusterStatePublisher {
        private void becomeLeader() {
            assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
            // 是否是CANDIDATE
            assert mode == Mode.CANDIDATE : "expected candidate but was " + mode;
            // 是否有Master角色权限
            assert getLocalNode().isMasterNode() : getLocalNode() + " became a leader but is not master-eligible";
            logger.debug("handleJoinRequest: coordinator becoming LEADER in term {} (was {}, lastKnownLeader was [{}])", getCurrentTerm(), mode,lastKnownLeader);
            // 转为Leader
            mode = Mode.LEADER;
            joinAccumulator.close(mode);
            // 设置为LeaderJoinAccumulator
            joinAccumulator = joinHelper.new LeaderJoinAccumulator();
            lastKnownLeader = Optional.of(getLocalNode());
            peerFinder.deactivate(getLocalNode());
            clusterFormationFailureHelper.stop();
            closePrevotingAndElectionScheduler();
            // 更新Leader信息和Term信息
            preVoteCollector.update(getPreVoteResponse(), getLocalNode());
            assert leaderChecker.leader() == null : leaderChecker.leader();
            followersChecker.updateFastResponseState(getCurrentTerm(), mode);
        }
    }
    

    参考

    【张超】深入理解 Elasticsearch 7.x 新的集群协调层

    【陈亮亮】Elasticsearch 新版选主流程

    【政采云技术团队】Elasticsearch系列之二选主7.x之后

    Elasticsearch版本:8.3

  • 相关阅读:
    增强AWS ECS监控:批量生产监控设置
    c语言练习题82:顺序表的使用
    LeetCode每日一题(963. Minimum Area Rectangle II)
    【XInput】手柄模拟鼠标运作之 .NET P/Invoke 和 UWP-API 方案
    realtek高清晰音频管理器怎么关闭的方法
    一起Talk Android吧(第三百七十三回:多线程版Timer)
    画一条0.5px的线、设置小于12px的字体、解决 1px 问题
    会员营销体系构建需要满足的四个条件
    单项冠军企业申报条件及奖励政策
    小龙虾优化算法COA求解不闭合SD-MTSP,可以修改旅行商个数及起点(提供MATLAB代码)
  • 原文地址:https://www.cnblogs.com/shanml/p/16684887.html