• zookeeper源码(06)ZooKeeperServer及子类


    ZooKeeperServer

    实现了单机版zookeeper服务端功能,子类实现了更加丰富的分布式集群功能:

    ZooKeeperServer
      |-- QuorumZooKeeperServer
        |-- LeaderZooKeeperServer
        |-- LearnerZooKeeperServer
          |-- FollowerZooKeeperServer
          |-- ObserverZooKeeperServer
      |-- ReadOnlyZooKeeperServer
    

    主要字段

    // tickTime参数默认值
    public static final int DEFAULT_TICK_TIME = 3000;
    protected int tickTime = DEFAULT_TICK_TIME;
    // 默认tickTime * 2
    protected int minSessionTimeout = -1;
    // 默认tickTime * 20
    protected int maxSessionTimeout = -1;
    
    // 会话跟踪
    protected SessionTracker sessionTracker;
    
    // 存储组件
    private FileTxnSnapLog txnLogFactory = null;
    private ZKDatabase zkDb;
    
    // 缓存数据
    private ResponseCache readResponseCache;
    private ResponseCache getChildrenResponseCache;
    
    // zxid会在启动阶段设置为最新lastZxid
    private final AtomicLong hzxid = new AtomicLong(0);
    // 请求处理器链入口
    protected RequestProcessor firstProcessor;
    // 缓存变化的数据
    final Deque outstandingChanges = new ArrayDeque<>();
    final Map outstandingChangesForPath = new HashMap<>();
    
    protected ServerCnxnFactory serverCnxnFactory;
    protected ServerCnxnFactory secureServerCnxnFactory;
    
    // 大请求判断使用的参数
    private volatile int largeRequestMaxBytes = 100 * 1024 * 1024;
    private volatile int largeRequestThreshold = -1;
    

    主要方法

    方法定义

    // 通过zkDb从dataTree中删除Watcher监听器
    void removeCnxn(ServerCnxn cnxn);
    
    // 创建zkDb(为null时)并loadData加载数据
    public void startdata() throws IOException, InterruptedException;
    // 加载数据、清理session、生成快照
    public void loadData() throws IOException, InterruptedException;
    // 保存zkDb当前快照
    public synchronized File takeSnapshot(boolean syncSnap, boolean isSevere,
                                          boolean fastForwardFromEdits) throws IOException;
    
    // 从指定的输入流解析数据,生成新的zkDb和SessionTrack
    public synchronized long restoreFromSnapshot(final InputStream inputStream) throws IOException;
    
    // 使用zkDb.truncateLog(zxid)删除快照数据
    public void truncateLog(long zxid) throws IOException;
    // 通过zkDb获取dataTree.lastProcessedZxid的值
    public long getLastProcessedZxid();
    
    // 提交closeSession类型的Request来关闭会话
    private void close(long sessionId);
    // 使用zkDb杀掉会话
    protected void killSession(long sessionId, long zxid);
    // 启动组件
    private void startupWithServerState(State state);
    // 创建RequestProcessor用来处理请求
    protected void setupRequestProcessors();
    // 创建SessionTracker
    protected void createSessionTracker();
    
    // 为指定的session生成一个密码
    byte[] generatePasswd(long id);
    // 验证session密码
    protected boolean checkPasswd(long sessionId, byte[] passwd);
    // 使用sessionTracker创建session、生成密码、提交一个createSession请求
    long createSession(ServerCnxn cnxn, byte[] passwd, int timeout);
    // 为指定的session绑定owner
    public void setOwner(long id, Object owner) throws SessionExpiredException;
    // 验证session之后使用finishSessionInit方法确定继续通信或者断开连接
    protected void revalidateSession(ServerCnxn cnxn, long sessionId, int sessionTimeout) throws IOException;
    public void finishSessionInit(ServerCnxn cnxn, boolean valid);
    // checkPasswd->revalidateSession->finishSessionInit
    public void reopenSession(ServerCnxn cnxn, long sessionId,
                              byte[] passwd, int sessionTimeout) throws IOException;
    
    // 把请求提交给requestThrottler之后再陆续调用submitRequestNow处理
    public void enqueueRequest(Request si);
    // 使用firstProcessor处理请求
    public void submitRequestNow(Request si);
    
    // 处理连接请求,网络IO层调用
    public void processConnectRequest(
        ServerCnxn cnxn, ConnectRequest request) throws IOException, ClientCnxnLimitException;
    // 处理业务请求,网络IO层调用
    public void processPacket(ServerCnxn cnxn, RequestHeader h, RequestRecord request) throws IOException;
    // sasl认证
    private void processSasl(
        RequestRecord request, ServerCnxn cnxn, RequestHeader requestHeader) throws IOException;
    
    // 处理transaction
    public ProcessTxnResult processTxn(TxnHeader hdr, Record txn);
    public ProcessTxnResult processTxn(Request request);
    private void processTxnForSessionEvents(Request request, TxnHeader hdr, Record txn);
    private ProcessTxnResult processTxnInDB(TxnHeader hdr, Record txn, TxnDigest digest);
    
    // Grant or deny authorization to an operation on a node
    public void checkACL(ServerCnxn cnxn, List acl, int perm, List ids,
                         String path, List setAcls) throws KeeperException.NoAuthException;
    // Check a path whether exceeded the quota
    public void checkQuota(String path, byte[] lastData, byte[] data,
                           int type) throws KeeperException.QuotaExceededException;
    private void checkQuota(String lastPrefix, long bytesDiff, long countDiff,
                            String namespace) throws KeeperException.QuotaExceededException;
    
    // 获取上级父类path
    private String parentPath(String path) throws KeeperException.BadArgumentsException;
    // 从Request获取有效的path
    private String effectiveACLPath(
        Request request) throws KeeperException.BadArgumentsException, KeeperException.InvalidACLException;
    // 根据Request获取需要的权限类型
    private int effectiveACLPerms(Request request);
    // 检查写权限
    public boolean authWriteRequest(Request request);
    

    loadData方法

    加载数据、清理session、生成快照:

    public void loadData() throws IOException, InterruptedException {
        // 初始化zxid
        if (zkDb.isInitialized()) {
            setZxid(zkDb.getDataTreeLastProcessedZxid());
        } else {
            setZxid(zkDb.loadDataBase());
        }
    
        // 使用killSession方法杀死过期会话
        zkDb.getSessions().stream()
                        .filter(session -> zkDb.getSessionWithTimeOuts().get(session) == null)
                        .forEach(session -> killSession(session, zkDb.getDataTreeLastProcessedZxid()));
    
        // 保存快照
        // txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts(), syncSnap)
        takeSnapshot();
    }
    

    killSession方法

    protected void killSession(long sessionId, long zxid) {
        // 需要清理临时节点
        zkDb.killSession(sessionId, zxid);
        if (sessionTracker != null) {
            // 删除会话跟踪信息
            sessionTracker.removeSession(sessionId);
        }
    }
    

    startupWithServerState方法

    private void startupWithServerState(State state) {
        if (sessionTracker == null) {
            createSessionTracker();
        }
        startSessionTracker();
        // 创建RequestProcessor用于处理请求
        setupRequestProcessors();
    
        // 这是一个限流的组件,不做分析
        startRequestThrottler();
    
        registerJMX();
        startJvmPauseMonitor();
        registerMetrics();
    
        setState(state);
    
        requestPathMetricsCollector.start();
    
        localSessionEnabled = sessionTracker.isLocalSessionsEnabled();
    
        notifyAll();
    }
    

    setupRequestProcessors方法(重要)

    protected void setupRequestProcessors() {
        RequestProcessor finalProcessor = new FinalRequestProcessor(this);
        RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor);
        ((SyncRequestProcessor) syncProcessor).start();
        firstProcessor = new PrepRequestProcessor(this, syncProcessor);
        ((PrepRequestProcessor) firstProcessor).start();
    }
    

    RequestProcessor接口:以处理器链方式处理事务,请求总是按顺序处理。standaloneServer、follower和leader有不同的处理器链。请求通过processRequest方法传递给其他RequestProcessor对象,通常情况总是由单个线程调用。当调用shutdown时,RequestProcessor还应关闭与其关联的其他RequestProcessor对象。

    FinalRequestProcessor类:处理与请求相关的事务,并提供查询服务,给客户端发响应,位于RequestProcessor链末尾。

    SyncRequestProcessor类:将请求记录到磁盘,对请求进行批处理,以有效地执行IO操作。在日志同步到磁盘之前,请求不会传递给下一个RequestProcessor对象。SyncRequestProcessor用于3种不同的情况:

    • Leader - 将请求同步到磁盘,并将其转发给AckRequestProcessor,后者将ack发送回leader自己
    • Follower - 将请求同步到磁盘,并将其转发给SendAckRequestProcessor,后者将ack发送给leader
    • Observer - 将请求同步到磁盘,作为INFORM数据包接收。不将ack发送回leader,因此nextProcessor将为null

    PrepRequestProcessor类:通常位于RequestProcessor链开头,为更新请求关联的事务做设置。

    createSessionTracker方法

    protected void createSessionTracker() {
        sessionTracker = new SessionTrackerImpl(this, zkDb.getSessionWithTimeOuts(), tickTime,
                                                createSessionTrackerServerId, getZooKeeperServerListener());
    }
    

    不同的子类使用了不同的SessionTracker实现类:

    • LeaderZooKeeperServer - LeaderSessionTracker
    • LearnerZooKeeperServer- LearnerSessionTracker

    createSession方法

    long createSession(ServerCnxn cnxn, byte[] passwd, int timeout) {
        if (passwd == null) {
            passwd = new byte[0];
        }
        // 创建一个session
        long sessionId = sessionTracker.createSession(timeout);
        // 生成session密码
        Random r = new Random(sessionId ^ superSecret);
        r.nextBytes(passwd);
        // 提交createSession请求,该请求会被RequestProcessor处理
        CreateSessionTxn txn = new CreateSessionTxn(timeout);
        cnxn.setSessionId(sessionId);
        Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, RequestRecord.fromRecord(txn), null);
        submitRequest(si);
        return sessionId;
    }
    

    submitRequestNow方法

    public void submitRequestNow(Request si) {
        try {
            touch(si.cnxn);
            boolean validpacket = Request.isValid(si.type);
            if (validpacket) {
                setLocalSessionFlag(si);
                // 使用firstProcessor处理请求
                firstProcessor.processRequest(si);
                if (si.cnxn != null) {
                    incInProcess();
                }
            } else {
                // Update request accounting/throttling limits
                requestFinished(si);
                new UnimplementedRequestProcessor().processRequest(si);
            }
        } catch (MissingSessionException e) {
            // Update request accounting/throttling limits
            requestFinished(si);
        } catch (RequestProcessorException e) {
            // Update request accounting/throttling limits
            requestFinished(si);
        }
    }
    

    processConnectRequest方法

    public void processConnectRequest(
            ServerCnxn cnxn, ConnectRequest request) throws IOException, ClientCnxnLimitException {
    
        long sessionId = request.getSessionId();
        // 略
    
        if (request.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) {
            // zxid参数有误
            throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.CLIENT_ZXID_AHEAD);
        }
        int sessionTimeout = request.getTimeOut();
        byte[] passwd = request.getPasswd();
        int minSessionTimeout = getMinSessionTimeout();
        if (sessionTimeout < minSessionTimeout) {
            sessionTimeout = minSessionTimeout;
        }
        int maxSessionTimeout = getMaxSessionTimeout();
        if (sessionTimeout > maxSessionTimeout) {
            sessionTimeout = maxSessionTimeout;
        }
        cnxn.setSessionTimeout(sessionTimeout);
        // We don't want to receive any packets until we are sure that the session is setup
        cnxn.disableRecv();
        if (sessionId == 0) {
            // 创建session
            long id = createSession(cnxn, passwd, sessionTimeout);
        } else {
            validateSession(cnxn, sessionId); // do nothing
            // 关闭旧的ServerCnxn
            if (serverCnxnFactory != null) {
                serverCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT);
            }
            if (secureServerCnxnFactory != null) {
                secureServerCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT);
            }
            cnxn.setSessionId(sessionId);
            // 开启新session
            reopenSession(cnxn, sessionId, passwd, sessionTimeout);
        }
    }
    

    processPacket方法

    public void processPacket(ServerCnxn cnxn, RequestHeader h, RequestRecord request) throws IOException {
        cnxn.incrOutstandingAndCheckThrottle(h);
    
        if (h.getType() == OpCode.auth) {
            AuthPacket authPacket = request.readRecord(AuthPacket::new);
            String scheme = authPacket.getScheme();
            ServerAuthenticationProvider ap = ProviderRegistry.getServerProvider(scheme);
            Code authReturn = KeeperException.Code.AUTHFAILED;
            // 认证、继续通信或者关闭连接,略
            return;
        } else if (h.getType() == OpCode.sasl) {
            processSasl(request, cnxn, h);
        } else {
            if (!authHelper.enforceAuthentication(cnxn, h.getXid())) {
                return;
            } else {
                Request si = new Request(
                    cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), request, cnxn.getAuthInfo());
                int length = request.limit();
                if (isLargeRequest(length)) { // 判断large请求
                    checkRequestSizeWhenMessageReceived(length);
                    si.setLargeRequestSize(length);
                }
                si.setOwner(ServerCnxn.me);
                // 提交请求等待firstProcessor处理
                submitRequest(si);
            }
        }
    }
    

    processTxn相关方法

    // entry point for quorum/Learner.java
    public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
        processTxnForSessionEvents(null, hdr, txn);
        return processTxnInDB(hdr, txn, null);
    }
    
    // entry point for FinalRequestProcessor.java
    public ProcessTxnResult processTxn(Request request) {
        TxnHeader hdr = request.getHdr();
        processTxnForSessionEvents(request, hdr, request.getTxn());
    
        final boolean writeRequest = (hdr != null);
        final boolean quorumRequest = request.isQuorum();
    
        // return fast w/o synchronization when we get a read
        if (!writeRequest && !quorumRequest) {
            return new ProcessTxnResult();
        }
        synchronized (outstandingChanges) {
            ProcessTxnResult rc = processTxnInDB(hdr, request.getTxn(), request.getTxnDigest());
    
            // request.hdr is set for write requests, which are the only ones
            // that add to outstandingChanges.
            if (writeRequest) {
                long zxid = hdr.getZxid();
                while (!outstandingChanges.isEmpty() && outstandingChanges.peek().zxid <= zxid) {
                    ChangeRecord cr = outstandingChanges.remove();
                    ServerMetrics.getMetrics().OUTSTANDING_CHANGES_REMOVED.add(1);
                    if (outstandingChangesForPath.get(cr.path) == cr) {
                        outstandingChangesForPath.remove(cr.path);
                    }
                }
            }
    
            // do not add non quorum packets to the queue.
            if (quorumRequest) {
                getZKDatabase().addCommittedProposal(request);
            }
            return rc;
        }
    }
    
    private void processTxnForSessionEvents(Request request, TxnHeader hdr, Record txn) {
        int opCode = (request == null) ? hdr.getType() : request.type;
        long sessionId = (request == null) ? hdr.getClientId() : request.sessionId;
    
        if (opCode == OpCode.createSession) {
            if (hdr != null && txn instanceof CreateSessionTxn) {
                CreateSessionTxn cst = (CreateSessionTxn) txn;
                // Add the session to the local session map or global one in zkDB.
                sessionTracker.commitSession(sessionId, cst.getTimeOut());
            }
        } else if (opCode == OpCode.closeSession) {
            sessionTracker.removeSession(sessionId);
        }
    }
    
    private ProcessTxnResult processTxnInDB(TxnHeader hdr, Record txn, TxnDigest digest) {
        if (hdr == null) {
            return new ProcessTxnResult();
        } else {
            return getZKDatabase().processTxn(hdr, txn, digest);
        }
    }
    

    QuorumZooKeeperServer

    集群模式下的ZooKeeperServer基类:

    • 封装了QuorumPeer用来获取节点信息
    • 封装了UpgradeableSessionTracker做会话追踪

    LeaderZooKeeperServer

    Just like the standard ZooKeeperServer. We just replace the request processors: PrepRequestProcessor -> ProposalRequestProcessor -> CommitProcessor -> Leader.ToBeAppliedRequestProcessor -> FinalRequestProcessor
    

    实现类概述

    集群模式下leader节点使用的ZooKeeperServer实现类:

    • 继承QuorumZooKeeperServer

    • 使用的RequestProcessor与父类不同:

      // 构建处理器链
      protected void setupRequestProcessors() {
          RequestProcessor finalProcessor = new FinalRequestProcessor(this);
          RequestProcessor toBeAppliedProcessor = 
              new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader());
          commitProcessor = new CommitProcessor(
              toBeAppliedProcessor, Long.toString(getServerId()), false, getZooKeeperServerListener());
          commitProcessor.start();
          ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this, commitProcessor);
          proposalProcessor.initialize();
          prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);
          prepRequestProcessor.start();
          firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);
      
          setupContainerManager(); // 启动ContainerManager用于删除ttl节点和container节点
      }
      
    • 使用LeaderSessionTracker做会话追踪

    • 与learner节点通信

    处理器链

    • FinalRequestProcessor - 处理与请求相关的事务,并提供查询服务,给客户端发响应,位于RequestProcessor链末尾

    • ToBeAppliedRequestProcessor - 维护toBeApplied列表

    • CommitProcessor - 等待commit完成之后调用下游RequestProcessor处理器

    • ProposalRequestProcessor - 发起proposal并将Request转发给内部的SyncRequestProcessor和AckRequestProcessor

      public ProposalRequestProcessor(LeaderZooKeeperServer zks, RequestProcessor nextProcessor) {
          this.zks = zks;
          this.nextProcessor = nextProcessor;
          // 内部有维护SyncRequestProcessor和AckRequestProcessor
          AckRequestProcessor ackProcessor = new AckRequestProcessor(zks.getLeader());
          syncProcessor = new SyncRequestProcessor(zks, ackProcessor);
      
          forwardLearnerRequestsToCommitProcessorDisabled = Boolean.getBoolean(
                  FORWARD_LEARNER_REQUESTS_TO_COMMIT_PROCESSOR_DISABLED);
      }
      
    • PrepRequestProcessor - 通常位于RequestProcessor链开头,为更新请求关联的事务做设置

    • LeaderRequestProcessor - 负责执行本地会话升级,只有直接提交给leader的Request才能通过这个处理器

    LearnerZooKeeperServer

    Learner基类:

    • 使用LearnerSessionTracker做会话追踪
    • 使用CommitProcessor、SyncRequestProcessor做处理器链

    FollowerZooKeeperServer

    实现类概述

    与ZooKeeperServer类似,只是处理器链不同:

    FollowerRequestProcessor -> CommitProcessor -> FinalRequestProcessor

    使用SyncRequestProcessor来记录leader的提案。

    处理器链

    setupRequestProcessors方法:

    protected void setupRequestProcessors() {
        RequestProcessor finalProcessor = new FinalRequestProcessor(this);
        commitProcessor = new CommitProcessor(
            finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener());
        commitProcessor.start();
        firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
        ((FollowerRequestProcessor) firstProcessor).start();
        syncProcessor = new SyncRequestProcessor(this, new SendAckRequestProcessor(getFollower()));
        syncProcessor.start();
    }
    
    • FinalRequestProcessor
    • CommitProcessor
    • FollowerRequestProcessor - 将数据更新请求转发给Leader
    • SyncRequestProcessor
    • SendAckRequestProcessor - 给leader发ACK

    ObserverZooKeeperServer

    Observer类型节点的ZooKeeperServer实现。

    setupRequestProcessors方法:

    protected void setupRequestProcessors() {
        RequestProcessor finalProcessor = new FinalRequestProcessor(this);
        commitProcessor = new CommitProcessor(
            finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener());
        commitProcessor.start();
        firstProcessor = new ObserverRequestProcessor(this, commitProcessor);
        ((ObserverRequestProcessor) firstProcessor).start();
    
        // 默认false
        if (syncRequestProcessorEnabled) {
            syncProcessor = new SyncRequestProcessor(this, null);
            syncProcessor.start();
        }
    }
    
  • 相关阅读:
    系列十二、强引用、软引用、弱引用、虚引用分别是什么?
    C_11练习题
    【云原生K8S】kubeadm部署kubernetes集群
    计算机网络分层结构
    【Java】SpringMVC ResponseBodyAdvice详解
    彻底解决python多进程can‘t pickle问题
    基于51单片机交通灯仿真_紧急开关+黄灯倒计时+可调时间(proteus+代码+报告+讲解视频)
    计算机基础之整数和浮点数
    Django框架的推导
    【七】jmeter5.5+influxdb2.0+prometheus+grafana
  • 原文地址:https://www.cnblogs.com/xugf/p/17979795