• zookeeper源码(08)请求处理及数据读写流程


    ServerCnxnFactory

    用于接收客户端连接、管理客户端session、处理客户端请求。

    ServerCnxn抽象类

    代表一个客户端连接对象:

    • 从网络读写数据
    • 数据编解码
    • 将请求转发给上层组件或者从上层组件接收响应
    • 管理连接状态,比如:enableRecv、sessionTimeout、stale、invalid等
    • 保存当前的packetsReceived、packetsSent、lastCxid、lastZxid等
    • 继承了Watcher接口,也可以作为监听器

    两个实现类:

    • NIOServerCnxn - 基于NIO
    • NettyServerCnxn - 基于Netty

    NIOServerCnxnFactory

    基于NIO的非阻塞、多线程的ServerCnxnFactory实现类,多线程之间通过queue通信:

    • 1个accept线程,用来接收客户端连接,交给selector线程处理
    • 1-N个selector线程,每个线程会select 1/N个连接,多个selector线程的原因是,由于有大量连接,select()可能会成为性能瓶颈
    • 0-M个socket IO worker线程,做socket读写,如果配置为0则selector线程来做IO
    • 1个清理线程,用于关闭空闲连接

    线程数量分配示例:32核的机器,1accept线程,1个清理线程,4个selector线程,64个worker线程。

    configure方法

    • 不支持ssl

    • 创建ConnectionExpirerThread线程

    • 根据CPU核数确定各种线程的数量

      int numCores = Runtime.getRuntime().availableProcessors();
      // 32 cores sweet spot seems to be 4 selector threads
      numSelectorThreads = Integer.getInteger(
          ZOOKEEPER_NIO_NUM_SELECTOR_THREADS,
          Math.max((int) Math.sqrt((float) numCores / 2), 1));
      
      // 64
      numWorkerThreads = Integer.getInteger(ZOOKEEPER_NIO_NUM_WORKER_THREADS, 2 * numCores);
      
    • 创建SelectorThread线程

    • 创建ServerSocketChannel、启动监听、设置非阻塞

    • 创建AcceptThread线程

    start方法

    启动acceptThread、selectorThreads、workerPool、expirerThread线程。

    acceptThread线程

    1个accept线程,用来接收客户端连接,交给selector线程处理:

    1. select查找acceptable的key

    2. doAccept接受连接

      if (key.isAcceptable()) {
          if (!doAccept()) {
              pauseAccept(10);
          }
      }
      
    3. 给sc(SocketChannel)设置非阻塞、验证远程IP连接数不超过maxClientCnxns(60)、获取SelectorThread开始select读写事件

      // Round-robin assign this connection to a selector thread
      if (!selectorIterator.hasNext()) {
          selectorIterator = selectorThreads.iterator();
      }
      SelectorThread selectorThread = selectorIterator.next();
      // 使用队列缓存SocketChannel
      if (!selectorThread.addAcceptedConnection(sc)) {
          throw new IOException("Unable to add connection to selector queue");
      }
      

    selectorThread线程

    run方法select读写事件、接受客户连接、为key注册"感兴趣"的事件:

    • run方法

      public void run() {
          try {
              while (!stopped) {
                  try {
                      select(); // select读写事件
                      processAcceptedConnections(); // 接受客户连接
                      processInterestOpsUpdateRequests();
                  } catch (RuntimeException e) {
                  } catch (Exception e) {
                  }
              }
          }
          // ...
      }
      
    • 接受客户连接会注册OP_READ、创建NIOServerCnxn、绑定到key上面

      private void processAcceptedConnections() {
          SocketChannel accepted;
          while (!stopped && (accepted = acceptedQueue.poll()) != null) {
              SelectionKey key = null;
              try {
                  key = accepted.register(selector, SelectionKey.OP_READ);
                  NIOServerCnxn cnxn = createConnection(accepted, key, this);
                  key.attach(cnxn); // 绑定到key上
                  addCnxn(cnxn); // 维护连接层会话
              } catch (IOException e) {
                  //  略
              }
          }
      }
      
    • select到读写事件会交给handleIO方法处理

      private void handleIO(SelectionKey key) {
          IOWorkRequest workRequest = new IOWorkRequest(this, key);
          NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment();
      
          // Stop selecting this key while processing on its connection
          cnxn.disableSelectable();
          key.interestOps(0); // 重置感兴趣的事件,IO处理完成之后会重新注册读写事件
          touchCnxn(cnxn); // 维护连接层会话,刷新过期时间
          workerPool.schedule(workRequest); // workRequest.doWork方法做异步读写
      }
      
    • 为key注册"感兴趣"的事件

      private void processInterestOpsUpdateRequests() {
          SelectionKey key;
          while (!stopped && (key = updateQueue.poll()) != null) {
              NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment();
              if (cnxn.isSelectable()) {
                  key.interestOps(cnxn.getInterestOps());
              }
          }
      }
      

    workRequest.doWork方法

    workRequest是IOWorkRequest类型对象,doWork会read数据并传递给上层组件:

    public void doWork() throws InterruptedException {
        // 略
    
        if (key.isReadable() || key.isWritable()) {
            cnxn.doIO(key); // 在workerPool线程上执行
    
            // 略
            touchCnxn(cnxn); // 维护连接层会话,刷新过期时间
        }
    
        // 略
    }
    

    数据包使用 len body 方式传输,read的过程不介绍了,cnxn在read到完整的数据之后会调用readConnectRequest或readRequest方法将数据传递给上层组件:

    // 应用层建立连接
    private void readConnectRequest() throws IOException, ClientCnxnLimitException {
        BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer));
        // ConnectRequest封装:
        // protocolVersion(0), lastZxidSeen(0), timeOut(3s), sessionId(0), passwd(16位byte), readOnly(F)
        ConnectRequest request = protocolManager.deserializeConnectRequest(bia);
        zkServer.processConnectRequest(this, request);
        initialized = true;
    }
    
    protected void readRequest() throws IOException {
        RequestHeader h = new RequestHeader();
        // 请求头,封装客户端xid和type由客户端传递过来
        ByteBufferInputStream.byteBuffer2Record(incomingBuffer, h);
        // 转ByteBufferRequestRecord对象,封装请求字节流
        // readRecord将字节流反序列化为指定的Record实现类对象
        RequestRecord request = RequestRecord.fromBytes(incomingBuffer.slice());
        zkServer.processPacket(this, h, request);
    }
    

    NettyServerCnxnFactory

    基于Netty的ServerCnxnFactory实现。

    CnxnChannelHandler类

    核心的网络层处理器,此处记录重要代码:

    class CnxnChannelHandler extends ChannelDuplexHandler {
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            final Channel channel = ctx.channel();
            // 略
            // 创建NettyServerCnxn绑定到channel
            NettyServerCnxn cnxn = new NettyServerCnxn(channel, zkServer, NettyServerCnxnFactory.this);
            ctx.channel().attr(CONNECTION_ATTRIBUTE).set(cnxn);
    
            // 略
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            try {
                try {
                    NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).get();
                    if (cnxn == null) {
                        LOG.error("channelRead() on a closed or closing NettyServerCnxn");
                    } else {
                        // 读取请求数据
                        cnxn.processMessage((ByteBuf) msg);
                    }
                } catch (Exception ex) {
                    throw ex;
                }
            } finally {
                ReferenceCountUtil.release(msg);
            }
        }
    }
    

    cnxn读取请求数据

    void processMessage(ByteBuf buf) {
        if (throttled.get()) {
            // 略
        } else {
            if (queuedBuffer != null) {
                appendToQueuedBuffer(buf.retainedDuplicate());
                processQueuedBuffer();
            } else {
                receiveMessage(buf); // 解码逻辑在此方法中
                // Have to check !closingChannel, because an error in
                // receiveMessage() could have led to close() being called.
                if (!closingChannel && buf.isReadable()) {
                    if (queuedBuffer == null) {
                        queuedBuffer = channel.alloc().compositeBuffer();
                    }
                    appendToQueuedBuffer(buf.retainedSlice(buf.readerIndex(), buf.readableBytes()));
                }
            }
        }
    }
    

    read到完整的数据之后会将数据传递给上层组件:

    if (initialized) {
        RequestHeader h = new RequestHeader();
        ByteBufferInputStream.byteBuffer2Record(bb, h);
        RequestRecord request = RequestRecord.fromBytes(bb.slice());
        zks.processPacket(this, h, request);
    } else {
        // 应用层建立连接
        BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(bb));
        ConnectRequest request = protocolManager.deserializeConnectRequest(bia);
        zks.processConnectRequest(this, request);
        initialized = true;
    }
    

    ZooKeeperServer处理方法

    processConnectRequest方法处理连接请求

    ZooKeeperServer的processConnectRequest方法用来处理连接请求:

    public void processConnectRequest(
            ServerCnxn cnxn, ConnectRequest request) throws IOException, ClientCnxnLimitException {
    
        long sessionId = request.getSessionId(); // 默认0
        int tokensNeeded = 1;
        // 略
    
        // ro验证
        if (!request.getReadOnly() && this instanceof ReadOnlyZooKeeperServer) {
            String msg = "Refusing session request for not-read-only client";
            throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.NOT_READ_ONLY_CLIENT);
        }
        // 客户端zxid验证
        if (request.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) {
            String msg = "Refusing session request for client "
                         + cnxn.getRemoteSocketAddress()
                         + " as it has seen zxid 0x"
                         + Long.toHexString(request.getLastZxidSeen())
                         + " our last zxid is 0x"
                         + Long.toHexString(getZKDatabase().getDataTreeLastProcessedZxid())
                         + " client must try another server";
            throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.CLIENT_ZXID_AHEAD);
        }
        int sessionTimeout = request.getTimeOut(); // 客户端默认30000
        byte[] passwd = request.getPasswd();
        int minSessionTimeout = getMinSessionTimeout(); // 默认tickTime * 2
        if (sessionTimeout < minSessionTimeout) {
            sessionTimeout = minSessionTimeout;
        }
        int maxSessionTimeout = getMaxSessionTimeout(); // 默认tickTime * 20
        if (sessionTimeout > maxSessionTimeout) {
            sessionTimeout = maxSessionTimeout;
        }
        cnxn.setSessionTimeout(sessionTimeout); // 设置超时时长
    
        // We don't want to receive any packets until we are sure that the session is setup
        cnxn.disableRecv();
    
        if (sessionId == 0) {
            // 创建新session
            long id = createSession(cnxn, passwd, sessionTimeout);
        } else {
            validateSession(cnxn, sessionId);
            // 杀掉旧的session和连接
            if (serverCnxnFactory != null) {
                serverCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT);
            }
            if (secureServerCnxnFactory != null) {
                secureServerCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT);
            }
            // add新session
            cnxn.setSessionId(sessionId);
            // 返回connect响应
            reopenSession(cnxn, sessionId, passwd, sessionTimeout);
            ServerMetrics.getMetrics().CONNECTION_REVALIDATE_COUNT.add(1);
        }
    }
    
    // 重点看一下创建新session
    long createSession(ServerCnxn cnxn, byte[] passwd, int timeout) {
        if (passwd == null) {
            passwd = new byte[0];
        }
        long sessionId = sessionTracker.createSession(timeout); // 创建Session返回sessionId
        Random r = new Random(sessionId ^ superSecret);
        r.nextBytes(passwd); // passwd会赋值给request.passwd
        CreateSessionTxn txn = new CreateSessionTxn(timeout);
        cnxn.setSessionId(sessionId);
        // 给业务层处理器提交createSession请求
        // RequestRecord.fromRecord(txn)返回SimpleRequestRecord对象,封装Record对象
        Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, RequestRecord.fromRecord(txn), null);
        submitRequest(si);
        return sessionId;
    }
    

    processPacket方法处理业务请求

    ZooKeeperServer的processPacket方法用来处理业务请求:

    public void processPacket(ServerCnxn cnxn, RequestHeader h, RequestRecord request) throws IOException {
    
        cnxn.incrOutstandingAndCheckThrottle(h);
    
        if (h.getType() == OpCode.auth) {
            // 略
            return;
        } else if (h.getType() == OpCode.sasl) {
            // 略
        } else {
            if (!authHelper.enforceAuthentication(cnxn, h.getXid())) {
                return;
            } else {
                Request si = new Request(
                    cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), request, cnxn.getAuthInfo());
                int length = request.limit();
                // 大请求验证
                if (isLargeRequest(length)) { // 默认返回false
                    checkRequestSizeWhenMessageReceived(length);
                    si.setLargeRequestSize(length);
                }
                si.setOwner(ServerCnxn.me);
                // 提交给业务层处理器
                submitRequest(si);
            }
        }
    }
    

    submitRequest流程

    1. 先把request提交给requestThrottler组件
    2. requestThrottler是一个限流(默认不启用)组件,内部使用队列缓存request,异步线程消费队列,将request提交给业务处理器
    3. 直到submitRequest方法,业务处理才离开workerPool线程
    if (request != null) {
        if (request.isStale()) {
            ServerMetrics.getMetrics().STALE_REQUESTS.add(1);
        }
        final long elapsedTime = Time.currentElapsedTime() - request.requestThrottleQueueTime;
        ServerMetrics.getMetrics().REQUEST_THROTTLE_QUEUE_TIME.add(elapsedTime);
        // 默认不限流
        if (shouldThrottleOp(request, elapsedTime)) {
          request.setIsThrottled(true);
          ServerMetrics.getMetrics().THROTTLED_OPS.add(1);
        }
        // 提交
        zks.submitRequestNow(request);
    }
    

    submitRequestNow方法将请求提交给业务层处理器:

    public void submitRequestNow(Request si) {
        // 略
        try {
            touch(si.cnxn); // 刷新session过期时间
            boolean validpacket = Request.isValid(si.type);
            if (validpacket) {
                setLocalSessionFlag(si);
                firstProcessor.processRequest(si); // 提交给业务层处理器
                if (si.cnxn != null) {
                    incInProcess();
                }
            } else {
                // Update request accounting/throttling limits
                requestFinished(si);
                new UnimplementedRequestProcessor().processRequest(si);
            }
        } catch (MissingSessionException e) {
            requestFinished(si);
        } catch (RequestProcessorException e) {
            requestFinished(si);
        }
    }
    

    Leader客户端业务层处理器链

    在之前的文章已经介绍,leader使用LeaderZooKeeperServer作为服务实现类。

    本章节介绍"leader处理客户端请求"的流程。

    处理器链

    // 构建处理器链
    protected void setupRequestProcessors() {
        RequestProcessor finalProcessor = new FinalRequestProcessor(this);
        RequestProcessor toBeAppliedProcessor =
            new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader());
        commitProcessor = new CommitProcessor(
            toBeAppliedProcessor, Long.toString(getServerId()), false, getZooKeeperServerListener());
        commitProcessor.start();
        ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this, commitProcessor);
        proposalProcessor.initialize();
        prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);
        prepRequestProcessor.start();
        firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);
    
        setupContainerManager();
    }
    
    • FinalRequestProcessor - 处理与请求相关的事务,并提供查询服务,给客户端发响应,位于RequestProcessor链末尾

    • ToBeAppliedRequestProcessor - 维护toBeApplied列表,之后必须是FinalRequestProcessor且processRequest必须同步处理

    • CommitProcessor - 等待commit完成之后调用下游RequestProcessor处理器

    • ProposalRequestProcessor - 发起proposal并将Request转发给内部的SyncRequestProcessor和AckRequestProcessor

      public ProposalRequestProcessor(LeaderZooKeeperServer zks, RequestProcessor nextProcessor) {
          this.zks = zks;
          this.nextProcessor = nextProcessor;
          // 内部有维护一个SyncRequestProcessor和AckRequestProcessor
          AckRequestProcessor ackProcessor = new AckRequestProcessor(zks.getLeader());
          syncProcessor = new SyncRequestProcessor(zks, ackProcessor);
      
          forwardLearnerRequestsToCommitProcessorDisabled = Boolean.getBoolean(
                  FORWARD_LEARNER_REQUESTS_TO_COMMIT_PROCESSOR_DISABLED);
      }
      
    • PrepRequestProcessor - 通常位于RequestProcessor链开头,为更新请求关联的事务做设置

    • LeaderRequestProcessor - 负责执行本地会话升级,只有直接提交给leader的Request才能通过这个处理器

    LeaderRequestProcessor

    public void processRequest(Request request) throws RequestProcessorException {
        // 略
    
        // 默认不支持localSession
        Request upgradeRequest = null;
        try {
            upgradeRequest = lzks.checkUpgradeSession(request);
        } catch (KeeperException ke) {
            // 略
        } catch (IOException ie) {
            // 略
        }
        // 此处upgradeRequest==null
        if (upgradeRequest != null) {
            nextProcessor.processRequest(upgradeRequest);
        }
        // 调用下游processor
        nextProcessor.processRequest(request);
    }
    

    PrepRequestProcessor

    事务设置:

    • 使用队列缓存request
    • 消费线程从队列拉request设置事务

    run方法

    public void run() {
        try {
            while (true) {
                ServerMetrics.getMetrics().PREP_PROCESSOR_QUEUE_SIZE.add(submittedRequests.size());
                Request request = submittedRequests.take();
                ServerMetrics.getMetrics().PREP_PROCESSOR_QUEUE_TIME
                    .add(Time.currentElapsedTime() - request.prepQueueStartTime);
                // 略
                if (Request.requestOfDeath == request) {
                    break;
                }
    
                request.prepStartTime = Time.currentElapsedTime();
                pRequest(request);
            }
        } catch (Exception e) {
            handleException(this.getName(), e);
        }
    }
    
    protected void pRequest(Request request) throws RequestProcessorException {
        request.setHdr(null);
        request.setTxn(null);
    
        if (!request.isThrottled()) {
          pRequestHelper(request);
        }
    
        request.zxid = zks.getZxid(); // zxid
        long timeFinishedPrepare = Time.currentElapsedTime();
        ServerMetrics.getMetrics().PREP_PROCESS_TIME.add(timeFinishedPrepare - request.prepStartTime);
        nextProcessor.processRequest(request); // 调用下游processor
        ServerMetrics.getMetrics().PROPOSAL_PROCESS_TIME.add(Time.currentElapsedTime() - timeFinishedPrepare);
    }
    

    pRequestHelper方法

    private void pRequestHelper(Request request) {
        try {
            switch (request.type) {
            case OpCode.createContainer:
            case OpCode.create:
            case OpCode.create2:
                // 创建节点请求封装path、data、acl、flag
                CreateRequest create2Request = request.readRequestRecord(CreateRequest::new);
                // zks.getNextZxid()获取递增zxid
                pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request);
                break;
            case OpCode.createTTL:
                // 创建ttl请求封装path、data、acl、flag、ttl
                CreateTTLRequest createTtlRequest = request.readRequestRecord(CreateTTLRequest::new);
                pRequest2Txn(request.type, zks.getNextZxid(), request, createTtlRequest);
                break;
            case OpCode.deleteContainer:
                // 封装path
                DeleteContainerRequest deleteContainerRequest =
                    request.readRequestRecord(DeleteContainerRequest::new);
                pRequest2Txn(request.type, zks.getNextZxid(), request, deleteContainerRequest);
                break;
            case OpCode.delete:
                // 删除节点请求封装path、version
                DeleteRequest deleteRequest = request.readRequestRecord(DeleteRequest::new);
                pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest);
                break;
            case OpCode.setData:
                // 设置节点数据请求封装path、data、version
                SetDataRequest setDataRequest = request.readRequestRecord(SetDataRequest::new);
                pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest);
                break;
            case OpCode.reconfig:
                // reconfig请求封装joiningServers、leavingServers、newMembers、curConfigId
                ReconfigRequest reconfigRequest = request.readRequestRecord(ReconfigRequest::new);
                pRequest2Txn(request.type, zks.getNextZxid(), request, reconfigRequest);
                break;
            case OpCode.setACL:
                // 设置acl请求封装path、acl、version
                SetACLRequest setAclRequest = request.readRequestRecord(SetACLRequest::new);
                pRequest2Txn(request.type, zks.getNextZxid(), request, setAclRequest);
                break;
            case OpCode.check:
                // check请求封装path、version
                CheckVersionRequest checkRequest = request.readRequestRecord(CheckVersionRequest::new);
                pRequest2Txn(request.type, zks.getNextZxid(), request, checkRequest);
                break;
            case OpCode.multi:
                // 遍历 逐个pRequest2Txn
                // pRequest2Txn(op.getType(), zxid, request, subrequest)
                // 封装MultiTxn
                break;
    
            // create/close session don't require request record
            case OpCode.createSession:
            case OpCode.closeSession:
                if (!request.isLocalSession()) { // 非本地会话
                    pRequest2Txn(request.type, zks.getNextZxid(), request, null);
                }
                break;
    
            // All the rest don't need to create a Txn - just verify session
            case OpCode.sync:
            // sync,exists,getData,getACL,getChildren,getAllChildrenNumber,getChildren2,ping
            // setWatches,setWatches2,checkWatches,removeWatches,getEphemerals,multiRead,addWatch
            case OpCode.whoAmI:
                zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
                break;
            default:
                break;
            }
        } catch (KeeperException e) {
            // 略
        } catch (Exception e) {
            // 略
        }
    }
    

    pRequest2Txn方法流程

    代码量大,仅对重点的业务类型做简单分析。

    该方法首先会为request设置TxnHeader信息:

    if (request.getHdr() == null) {
        request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid, Time.currentWallTime(), type));
    }
    

    TxnHeader封装事务请求头:

    public class TxnHeader implements Record {
      private long clientId; // 会话ID
      private int cxid; // 客户端xid
      private long zxid; // 服务端xid
      private long time;
      private int type; // 操作类型
    }
    

    pRequest2Txn - create相关操作

    create/create2/createTTL/createContainer操作:

    1. 从flags创建createMode并验证ttl和ephemeral

    2. 验证acl

      zks.checkACL(request.cnxn, parentRecord.acl, ZooDefs.Perms.CREATE, request.authInfo, path, listACL);
      
    3. 生成顺序节点path

      int parentCVersion = parentRecord.stat.getCversion();
      if (createMode.isSequential()) {
          // 形如/users/admin0000000001
          path = path + String.format(Locale.ENGLISH, "%010d", parentCVersion);
      }
      validatePath(path, request.sessionId);
      
    4. request.setTxn

      int newCversion = parentRecord.stat.getCversion() + 1;
      if (type == OpCode.createContainer) {
          request.setTxn(new CreateContainerTxn(path, data, listACL, newCversion));
      } else if (type == OpCode.createTTL) {
          request.setTxn(new CreateTTLTxn(path, data, listACL, newCversion, ttl));
      } else {
          request.setTxn(new CreateTxn(path, data, listACL, createMode.isEphemeral(), newCversion));
      }
      
    5. 获取ephemeralOwner

      TxnHeader hdr = request.getHdr();
      long ephemeralOwner = 0;
      if (createMode.isContainer()) {
          ephemeralOwner = EphemeralType.CONTAINER_EPHEMERAL_OWNER;
      } else if (createMode.isTTL()) {
          ephemeralOwner = EphemeralType.TTL.toEphemeralOwner(ttl);
      } else if (createMode.isEphemeral()) {
          ephemeralOwner = request.sessionId;
      }
      
    6. addChangeRecord

      StatPersisted s = DataTree.createStat(hdr.getZxid(), hdr.getTime(), ephemeralOwner);
      parentRecord = parentRecord.duplicate(request.getHdr().getZxid());
      parentRecord.childCount++;
      parentRecord.stat.setCversion(newCversion);
      parentRecord.stat.setPzxid(request.getHdr().getZxid());
      parentRecord.precalculatedDigest = precalculateDigest(
              DigestOpCode.UPDATE, parentPath, parentRecord.data, parentRecord.stat);
      addChangeRecord(parentRecord); // 维护outstandingChanges集
      ChangeRecord nodeRecord = new ChangeRecord(request.getHdr().getZxid(), path, s, 0, listACL);
      nodeRecord.data = data;
      nodeRecord.precalculatedDigest = precalculateDigest(DigestOpCode.ADD, path, nodeRecord.data, s);
      setTxnDigest(request, nodeRecord.precalculatedDigest);
      addChangeRecord(nodeRecord); // 维护outstandingChanges集
      

    pRequest2Txn - delete操作

    1. 验证acl和version

      checkAndIncVersion(nodeRecord.stat.getVersion(), deleteRequest.getVersion(), path);
      
      // stat.version与delete.version需要一致
      
    2. 验证没有子节点,有子节点无法删除

    3. 创建DeleteTxn

      request.setTxn(new DeleteTxn(path));
      
    4. addChangeRecord

      parentRecord = parentRecord.duplicate(request.getHdr().getZxid());
      parentRecord.childCount--;
      parentRecord.stat.setPzxid(request.getHdr().getZxid());
      parentRecord.precalculatedDigest = precalculateDigest(
              DigestOpCode.UPDATE, parentPath, parentRecord.data, parentRecord.stat);
      addChangeRecord(parentRecord); // 维护outstandingChanges集
      
      nodeRecord = new ChangeRecord(request.getHdr().getZxid(), path, null, -1, null);
      nodeRecord.precalculatedDigest = precalculateDigest(DigestOpCode.REMOVE, path);
      setTxnDigest(request, nodeRecord.precalculatedDigest);
      addChangeRecord(nodeRecord); // 维护outstandingChanges集
      

    pRequest2Txn - setData操作

    1. 验证acl、获取newVersion

    2. 创建SetDataTxn

      request.setTxn(new SetDataTxn(path, setDataRequest.getData(), newVersion));
      
    3. addChangeRecord

      nodeRecord = nodeRecord.duplicate(request.getHdr().getZxid());
      nodeRecord.stat.setVersion(newVersion);
      nodeRecord.stat.setMtime(request.getHdr().getTime());
      nodeRecord.stat.setMzxid(zxid);
      nodeRecord.data = setDataRequest.getData();
      nodeRecord.precalculatedDigest = precalculateDigest(
              DigestOpCode.UPDATE, path, nodeRecord.data, nodeRecord.stat);
      setTxnDigest(request, nodeRecord.precalculatedDigest);
      addChangeRecord(nodeRecord);
      

    pRequest2Txn - setACL操作

    1. 验证acl、获取newVersion

    2. 创建SetACLTxn

      request.setTxn(new SetACLTxn(path, listACL, newVersion));
      
    3. addChangeRecord

    pRequest2Txn - createSession操作

    CreateSessionTxn createSessionTxn = request.readRequestRecord(CreateSessionTxn::new);
    request.setTxn(createSessionTxn);
    // only add the global session tracker but not to ZKDb
    zks.sessionTracker.trackSession(request.sessionId, createSessionTxn.getTimeOut());
    zks.setOwner(request.sessionId, request.getOwner());
    

    pRequest2Txn - closeSession操作

    long startTime = Time.currentElapsedTime();
    synchronized (zks.outstandingChanges) {
        // 获取所有临时节点
        Set es = zks.getZKDatabase().getEphemerals(request.sessionId);
        for (ChangeRecord c : zks.outstandingChanges) {
            if (c.stat == null) {
                // Doing a delete
                es.remove(c.path);
            } else if (c.stat.getEphemeralOwner() == request.sessionId) {
                es.add(c.path);
            }
        }
        for (String path2Delete : es) {
            if (digestEnabled) {
                parentPath = getParentPathAndValidate(path2Delete);
                parentRecord = getRecordForPath(parentPath);
                parentRecord = parentRecord.duplicate(request.getHdr().getZxid());
                parentRecord.stat.setPzxid(request.getHdr().getZxid());
                parentRecord.precalculatedDigest = precalculateDigest(
                        DigestOpCode.UPDATE, parentPath, parentRecord.data, parentRecord.stat);
                addChangeRecord(parentRecord);
            }
            nodeRecord = new ChangeRecord(request.getHdr().getZxid(), path2Delete, null, 0, null);
            nodeRecord.precalculatedDigest = precalculateDigest(DigestOpCode.REMOVE, path2Delete);
            addChangeRecord(nodeRecord);
        }
        if (ZooKeeperServer.isCloseSessionTxnEnabled()) {
            request.setTxn(new CloseSessionTxn(new ArrayList(es)));
        }
        zks.sessionTracker.setSessionClosing(request.sessionId);
    }
    

    ProposalRequestProcessor

    processRequest方法

    public void processRequest(Request request) throws RequestProcessorException {
        if (request instanceof LearnerSyncRequest) { // 处理sync命令,后续补充sync命令分析
            zks.getLeader().processSync((LearnerSyncRequest) request);
        } else {
            if (shouldForwardToNextProcessor(request)) {
                nextProcessor.processRequest(request); // 调用下游processor(CommitProcessor)
            }
            if (request.getHdr() != null) { // 事务消息需要发proposal、写磁盘
                // We need to sync and get consensus on any transactions
                try {
                    zks.getLeader().propose(request); // 给follower发proposal
                } catch (XidRolloverException e) {
                    throw new RequestProcessorException(e.getMessage(), e);
                }
                // 该对象的nextProcessor是AckRequestProcessor
                syncProcessor.processRequest(request);
            }
        }
    }
    

    发proposal

    发起一个proposal并发给所有成员:

    public Proposal propose(Request request) throws XidRolloverException {
        // zxid的低32位满了,强制重新选举,生成新一轮epoch和zxid
        if ((request.zxid & 0xffffffffL) == 0xffffffffL) {
            String msg =
                "zxid lower 32 bits have rolled over, forcing re-election, and therefore new epoch start";
            shutdown(msg);
            throw new XidRolloverException(msg);
        }
    
        // 序列化
        byte[] data = request.getSerializeData();
        proposalStats.setLastBufferSize(data.length);
        // 封装数据包
        QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, data, null);
    
        // 封装Proposal对象
        Proposal p = new Proposal();
        p.packet = pp;
        p.request = request;
    
        synchronized (this) {
            p.addQuorumVerifier(self.getQuorumVerifier());
    
            if (request.getHdr().getType() == OpCode.reconfig) {
                // 此处会把lastSeenQuorumVerifier写入zoo.cfg.dynamic.next文件
                self.setLastSeenQuorumVerifier(request.qv, true);
            }
    
            if (self.getQuorumVerifier().getVersion() < self.getLastSeenQuorumVerifier().getVersion()) {
                p.addQuorumVerifier(self.getLastSeenQuorumVerifier());
            }
    
            lastProposed = p.packet.getZxid();
            // 缓存到outstandingProposals中,processAck时会根据quorum状态确定是否提交
            outstandingProposals.put(lastProposed, p);
            // 给follower发数据
            sendPacket(pp);
        }
        ServerMetrics.getMetrics().PROPOSAL_COUNT.add(1);
        return p;
    }
    

    syncProcessor.processRequest方法

    processRequest方法将request放入queuedRequests队列,异步线程消费做业务处理:

    1. 从queuedRequests拉request

    2. 写txnlog

      zks.getZKDatabase().append(si);
      
    3. 滚动txnlog文件、生成snapshot文件

      // 默认当logCount超过了50000或logSize超过2GB时触发
      if (shouldSnapshot()) {
          resetSnapshotStats();
          // 滚动txnlog文件
          zks.getZKDatabase().rollLog();
          // 生成snapshot文件
          if (!snapThreadMutex.tryAcquire()) {
              LOG.warn("Too busy to snap, skipping");
          } else {
              // 异步线程生成snapshot文件
              new ZooKeeperThread("Snapshot Thread") {
                  public void run() {
                      try {
                          zks.takeSnapshot();
                      } catch (Exception e) {
                      } finally {
                          snapThreadMutex.release();
                      }
                  }
              }.start();
          }
      }
      
    4. 之后会把request传递给nextProcessor(AckRequestProcessor对象)

    AckRequestProcessor

    public void processRequest(Request request) {
        QuorumPeer self = leader.self;
        if (self != null) {
            request.logLatency(ServerMetrics.getMetrics().PROPOSAL_ACK_CREATION_LATENCY);
            leader.processAck(self.getMyId(), request.zxid, null);
        }
    }
    

    processAck

    Keep a count of acks that are received by the leader for a particular proposal.

    public synchronized void processAck(long sid, long zxid, SocketAddress followerAddr) {
    
        // 略
    
        if ((zxid & 0xffffffffL) == 0) {
            // We no longer process NEWLEADER ack with this method. However,
            // the learner sends an ack back to the leader after it gets
            // UPTODATE, so we just ignore the message.
            return;
        }
    
        if (outstandingProposals.size() == 0) {
            return;
        }
        // 说明zxid的数据已经提交
        if (lastCommitted >= zxid) {
            // The proposal has already been committed
            return;
        }
        Proposal p = outstandingProposals.get(zxid);
        if (p == null) {
            return;
        }
    
        // 略
    
        p.addAck(sid); // 添加ack
    
        boolean hasCommitted = tryToCommit(p, zxid, followerAddr);
    
        // reconfig类型命令的特殊处理,略
    }
    

    tryToCommit方法会判断quorum状态,即超过半数ack,如果到了quorum状态:

    1. 从outstandingProposals集移除

    2. 加入到toBeApplied集

    3. 给follower发COMMIT

      public void commit(long zxid) {
          synchronized (this) {
              lastCommitted = zxid;
          }
          QuorumPacket qp = new QuorumPacket(Leader.COMMIT, zxid, null, null);
          sendPacket(qp); // 发给follower
      }
      
    4. 提交

      zk.commitProcessor.commit(p.request);
      
      // 会进入commitProcessor的committedRequests队列
      

    CommitProcessor

    processRequest方法

    本地写磁盘之后即调用此方法:

    1. 把request提交到queuedRequests队列
    2. 写请求提交到queuedWriteRequests队列
    public void processRequest(Request request) {
        request.commitProcQueueStartTime = Time.currentElapsedTime();
        queuedRequests.add(request); // 所有请求
        // If the request will block, add it to the queue of blocking requests
        if (needCommit(request)) { // 写请求
            queuedWriteRequests.add(request);
            numWriteQueuedRequests.incrementAndGet();
        } else {
            numReadQueuedRequests.incrementAndGet();
        }
        wakeup();
    }
    

    commit方法

    follower对proposal到了quorum状态后,会使用这个方法提交事务,然后会将事务写到ZKDatabase中。

    public void commit(Request request) {
        request.commitRecvTime = Time.currentElapsedTime();
        ServerMetrics.getMetrics().COMMITS_QUEUED.add(1);
        committedRequests.add(request); // 进committedRequests队列
        wakeup();
    }
    

    run方法

    对比queuedRequests、queuedWriteRequests、committedRequests这几个队列,将提交成功的请求或读请求转发给下游的ToBeAppliedRequestProcessor处理器。

    ToBeAppliedRequestProcessor

    维护toBeApplied列表:清理已提交成功的request数据。

    FinalRequestProcessor

    处理与请求相关的事务,并提供查询服务,给客户端发响应,位于RequestProcessor链末尾。

    1. 执行事务
    2. 区分OpCode执行对应逻辑,发回响应
    3. 使用cnxn把响应返回给客户端

    执行事务

    if (!request.isThrottled()) {
      rc = applyRequest(request);
    }
    
    // ProcessTxnResult rc = zks.processTxn(request);
    

    createSession操作

    zks.finishSessionInit(request.cnxn, true);
    

    closeSession操作

    给客户端发closeConn数据包:

    cnxn.sendCloseSession();
    

    create相关操作

    create、create2、createTTL、createContainer操作,创建对应的Response对象。

    delete相关操作

    略。

    setData操作

    返回SetDataResponse响应。

    setACL操作

    返回SetACLResponse响应。

    getData操作

    private Record handleGetDataRequest(
            Record request, ServerCnxn cnxn, List authInfo) throws KeeperException, IOException {
        GetDataRequest getDataRequest = (GetDataRequest) request;
        String path = getDataRequest.getPath();
        DataNode n = zks.getZKDatabase().getNode(path);
        if (n == null) {
            throw new KeeperException.NoNodeException();
        }
        // 检查权限
        zks.checkACL(cnxn, zks.getZKDatabase().aclForNode(n), ZooDefs.Perms.READ, authInfo, path, null);
        Stat stat = new Stat();
        // 查询数据、addWatcher
        byte[] b = zks.getZKDatabase().getData(path, stat, getDataRequest.getWatch() ? cnxn : null);
        return new GetDataResponse(b, stat);
    }
    

    setWatches相关操作

    setWatches、setWatches2操作:

    SetWatches2 setWatches = request.readRequestRecord(SetWatches2::new);
    long relativeZxid = setWatches.getRelativeZxid();
    zks.getZKDatabase().setWatches(relativeZxid,
            setWatches.getDataWatches(),
            setWatches.getExistWatches(),
            setWatches.getChildWatches(),
            setWatches.getPersistentWatches(),
            setWatches.getPersistentRecursiveWatches(),
            cnxn);
    

    addWatch操作

    AddWatchRequest addWatcherRequest = request.readRequestRecord(AddWatchRequest::new);
    zks.getZKDatabase().addWatch(addWatcherRequest.getPath(), cnxn, addWatcherRequest.getMode());
    

    removeWatches操作

    RemoveWatchesRequest removeWatches = request.readRequestRecord(RemoveWatchesRequest::new);
    WatcherType type = WatcherType.fromInt(removeWatches.getType());
    path = removeWatches.getPath();
    boolean removed = zks.getZKDatabase().removeWatch(path, type, cnxn);
    

    getChildren相关操作

    getChildren、getChildren2操作:

    GetChildren2Request getChildren2Request = request.readRequestRecord(GetChildren2Request::new);
    Stat stat = new Stat();
    path = getChildren2Request.getPath();
    DataNode n = zks.getZKDatabase().getNode(path);
    
    zks.checkACL(request.cnxn, zks.getZKDatabase().aclForNode(n),
                 ZooDefs.Perms.READ, request.authInfo, path, null);
    List children = zks.getZKDatabase()
                               .getChildren(path, stat, getChildren2Request.getWatch() ? cnxn : null);
    rsp = new GetChildren2Response(children, stat);
    

    Follower处理Leader数据

    处理器链

    protected void setupRequestProcessors() {
        RequestProcessor finalProcessor = new FinalRequestProcessor(this);
        commitProcessor = new CommitProcessor(
            finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener());
        commitProcessor.start();
        firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
        ((FollowerRequestProcessor) firstProcessor).start();
    
        syncProcessor = new SyncRequestProcessor(this, new SendAckRequestProcessor(getFollower()));
        syncProcessor.start();
    }
    

    处理器链:

    FollowerRequestProcessor -> CommitProcessor -> FinalRequestProcessor
    

    commitProcessor和syncProcessor处理leader的proposal和commit请求。

    processPacket方法

    在"zookeeper源码(08)leader、follower和observer"中已经介绍,Follower使用processPacket方法处理来自leader的数据包:

    protected void processPacket(QuorumPacket qp) throws Exception {
        switch (qp.getType()) {
        case Leader.PING:
            ping(qp);
            break;
        case Leader.PROPOSAL: // 提案
            ServerMetrics.getMetrics().LEARNER_PROPOSAL_RECEIVED_COUNT.add(1);
            TxnLogEntry logEntry = SerializeUtils.deserializeTxn(qp.getData());
            TxnHeader hdr = logEntry.getHeader();
            Record txn = logEntry.getTxn();
            TxnDigest digest = logEntry.getDigest();
            // 略
            lastQueued = hdr.getZxid();
    
            // 略
    
            // 记录log数据
            // 使用syncProcessor持久化log数据,之后给leader发ack
            fzk.logRequest(hdr, txn, digest);
            // 略
            if (om != null) {
                // 略
            }
            break;
        case Leader.COMMIT: // 提交
            ServerMetrics.getMetrics().LEARNER_COMMIT_RECEIVED_COUNT.add(1);
            fzk.commit(qp.getZxid()); // 使用commitProcessor提交log
            if (om != null) {
                // 略
            }
            break;
        case Leader.COMMITANDACTIVATE: // Similar to COMMIT, only for a reconfig operation
            // get the new configuration from the request
            Request request = fzk.pendingTxns.element();
            SetDataTxn setDataTxn = (SetDataTxn) request.getTxn();
            QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData(), UTF_8));
    
            // get new designated leader from (current) leader's message
            ByteBuffer buffer = ByteBuffer.wrap(qp.getData());
            long suggestedLeaderId = buffer.getLong();
            final long zxid = qp.getZxid();
            boolean majorChange = self.processReconfig(qv, suggestedLeaderId, zxid, true);
            // commit (writes the new config to ZK tree (/zookeeper/config)
            fzk.commit(zxid);
    
            // 略
            break;
        case Leader.UPTODATE:
            // leader告知follower已处于最新状态,可以开始响应客户端
            // 正常情况下不应该再出现该类型请求
            break;
        case Leader.REVALIDATE:
            if (om == null || !om.revalidateLearnerSession(qp)) {
                revalidate(qp);
            }
            break;
        case Leader.SYNC:
            fzk.sync(); // sync命令
            break;
        default:
            LOG.warn("Unknown packet type");
            break;
        }
    }
    

    处理PROPOSAL

    syncProcessor.processRequest方法

    processRequest方法将request放入queuedRequests队列,异步线程消费做业务处理。

    在本地持久化之后,调用下游处理器(SendAckRequestProcessor对象)。

    SendAckRequestProcessor

    public void processRequest(Request si) {
        if (si.type != OpCode.sync) {
            // 确认zxid已持久化
            QuorumPacket qp = new QuorumPacket(Leader.ACK, si.getHdr().getZxid(), null, null);
            try {
                si.logLatency(ServerMetrics.getMetrics().PROPOSAL_ACK_CREATION_LATENCY);
                learner.writePacket(qp, false);
            } catch (IOException e) {
                // learner.sock.close();
            }
        }
    }
    
    public void flush() throws IOException {
        try {
            learner.writePacket(null, true);
        } catch (IOException e) {
            // learner.sock.close();
        }
    }
    

    处理COMMIT

    提交给commitProcessor处理器,该处理器会继续向下游(FinalRequestProcessor)传递。

    FinalRequestProcessor

    上文已经介绍,此处省略。

    Observer处理Leader数据

    处理器链

    protected void setupRequestProcessors() {
        // We might consider changing the processor behaviour of
        // Observers to, for example, remove the disk sync requirements.
        // Currently, they behave almost exactly the same as followers.
        RequestProcessor finalProcessor = new FinalRequestProcessor(this);
        commitProcessor = new CommitProcessor(
            finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener());
        commitProcessor.start();
        firstProcessor = new ObserverRequestProcessor(this, commitProcessor);
        ((ObserverRequestProcessor) firstProcessor).start();
    
        if (syncRequestProcessorEnabled) {
            syncProcessor = new SyncRequestProcessor(this, null);
            syncProcessor.start();
        }
    }
    

    processPacket方法

    protected void processPacket(QuorumPacket qp) throws Exception {
        TxnLogEntry logEntry;
        TxnHeader hdr;
        TxnDigest digest;
        Record txn;
        switch (qp.getType()) {
        case Leader.PING:
            ping(qp);
            break;
        case Leader.PROPOSAL:
            LOG.warn("Ignoring proposal");
            break;
        case Leader.COMMIT:
            LOG.warn("Ignoring commit");
            break;
        case Leader.UPTODATE:
            LOG.error("Received an UPTODATE message after Observer started");
            break;
        case Leader.REVALIDATE:
            revalidate(qp);
            break;
        case Leader.SYNC:
            ((ObserverZooKeeperServer) zk).sync();
            break;
        case Leader.INFORM:
            ServerMetrics.getMetrics().LEARNER_COMMIT_RECEIVED_COUNT.add(1);
            logEntry = SerializeUtils.deserializeTxn(qp.getData());
            hdr = logEntry.getHeader();
            txn = logEntry.getTxn();
            digest = logEntry.getDigest();
            Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, 0);
            request.logLatency(ServerMetrics.getMetrics().COMMIT_PROPAGATION_LATENCY);
            request.setTxnDigest(digest);
            ObserverZooKeeperServer obs = (ObserverZooKeeperServer) zk;
            obs.commitRequest(request);
            break;
        case Leader.INFORMANDACTIVATE: // 处理reconfig请求
            // get new designated leader from (current) leader's message
            ByteBuffer buffer = ByteBuffer.wrap(qp.getData());
            long suggestedLeaderId = buffer.getLong();
    
            byte[] remainingdata = new byte[buffer.remaining()];
            buffer.get(remainingdata);
            logEntry = SerializeUtils.deserializeTxn(remainingdata);
            hdr = logEntry.getHeader();
            txn = logEntry.getTxn();
            digest = logEntry.getDigest();
            QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) txn).getData(), UTF_8));
    
            request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, 0);
            request.setTxnDigest(digest);
            obs = (ObserverZooKeeperServer) zk;
    
            boolean majorChange = self.processReconfig(qv, suggestedLeaderId, qp.getZxid(), true);
    
            obs.commitRequest(request);
    
            if (majorChange) {
                throw new Exception("changes proposed in reconfig");
            }
            break;
        default:
            LOG.warn("Unknown packet type: {}", LearnerHandler.packetToString(qp));
            break;
        }
    }
    
  • 相关阅读:
    光模块故障的主要原因及解决办法
    Python学习之九九乘法表
    海报在线制作系统
    拓展:赖世雄英语初级美语(上)
    HIVE消费者画像
    .NET高级面试指南专题九【 泛型概念,常用泛型类和方法,泛型约束,协变与逆变】
    namespace命令空间
    React 知识点:基础语法、组件、React-Router、Redux
    DRF的filter组件
    非交互方式指定psql,pg_dump密码
  • 原文地址:https://www.cnblogs.com/xugf/p/18020590