• zookeeper源码(10)node增删改查及监听


    本文将从leader处理器入手,详细分析node的增删改查流程及监听器原理。

    回顾数据读写流程

    leader

    1. ZookeeperServer.processPacket封装Request并提交给业务处理器
    2. LeaderRequestProcessor做本地事务升级
    3. PrepRequestProcessor做事务准备
    4. ProposalRequestProcessor事务操作发proposal给follower节点,持久化到log文件
    5. CommitProcessor读请求直接转发给下游处理器,事务操作等待到了quorum状态转发给下游处理器
    6. ToBeAppliedRequestProcessor清理toBeApplied集
    7. FinalRequestProcessor将事务写到ZKDatabase中,给客户端发响应

    follower

    1. 处理PROPOSAL:使用SyncRequestProcessor处理器持久化,之后SendAckRequestProcessor给leader发ack
    2. 处理COMMIT:提交给CommitProcessor处理器,之后FinalRequestProcessor将事务写到ZKDatabase中

    创建node

    涉及create、create2、createContainer、createTTL等命令。

    PrepRequestProcessor事务准备

    反序列化请求参数

    switch (request.type) {
    case OpCode.createContainer:
    case OpCode.create:
    case OpCode.create2:
        CreateRequest create2Request = request.readRequestRecord(CreateRequest::new);
        pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request);
        break;
    case OpCode.createTTL:
        // 默认不支持ttl
        CreateTTLRequest createTtlRequest = request.readRequestRecord(CreateTTLRequest::new);
        pRequest2Txn(request.type, zks.getNextZxid(), request, createTtlRequest);
        break;
    // ...
    

    CreateRequest封装创建node的参数:

    public class CreateRequest implements Record {
      private String path;
      private byte[] data;
      private java.util.List acl;
      private int flags;
    }
    

    CreateTTLRequest封装创建node加ttl的参数:

    public class CreateTTLRequest implements Record {
      private String path;
      private byte[] data;
      private java.util.List acl;
      private int flags;
      private long ttl;
    }
    

    事务准备

    protected void pRequest2Txn(int type, long zxid, Request request, Record record)
            throws KeeperException, IOException, RequestProcessorException {
        // ...
    
        switch (type) {
          case OpCode.create:
          case OpCode.create2:
          case OpCode.createTTL:
          case OpCode.createContainer: {
            pRequest2TxnCreate(type, request, record);
            break;
          }
        // ...
        }
    }
    
    private void pRequest2TxnCreate(
            int type, Request request, Record record) throws IOException, KeeperException {
        int flags;
        String path;
        List acl;
        byte[] data;
        long ttl;
        if (type == OpCode.createTTL) {
            CreateTTLRequest createTtlRequest = (CreateTTLRequest) record;
            // 给flags等参数赋值
        } else {
            CreateRequest createRequest = (CreateRequest) record;
            // 给flags等参数赋值
            ttl = -1;
        }
        // CreateMode:
        // PERSISTENT, PERSISTENT_SEQUENTIAL, EPHEMERAL, EPHEMERAL_SEQUENTIAL,
        // CONTAINER, PERSISTENT_WITH_TTL, PERSISTENT_SEQUENTIAL_WITH_TTL
        CreateMode createMode = CreateMode.fromFlag(flags);
        // 验证临时节点、ttl参数、检查session
        // 默认不支持ttl
        validateCreateRequest(path, createMode, request, ttl);
        String parentPath = validatePathForCreate(path, request.sessionId); // 父节点path
    
        List listACL = fixupACL(path, request.authInfo, acl); // 请求携带的权限
        ChangeRecord parentRecord = getRecordForPath(parentPath); // 得到父节点
        // 验证CREATE权限
        zks.checkACL(request.cnxn, parentRecord.acl, ZooDefs.Perms.CREATE, request.authInfo, path, listACL);
        int parentCVersion = parentRecord.stat.getCversion();
        if (createMode.isSequential()) { // 顺序节点
            // 例如/users/admin0000000001
            path = path + String.format(Locale.ENGLISH, "%010d", parentCVersion);
        }
        validatePath(path, request.sessionId);
        // 略
        boolean ephemeralParent = 
            EphemeralType.get(parentRecord.stat.getEphemeralOwner()) == EphemeralType.NORMAL;
        // 父节点不可以是临时节点
    
        int newCversion = parentRecord.stat.getCversion() + 1; // 父节点的childVersion++
        // 检查字节限额
        zks.checkQuota(path, null, data, OpCode.create);
        // 不同类型创建对应的Txn对象
        if (type == OpCode.createContainer) {
            request.setTxn(new CreateContainerTxn(path, data, listACL, newCversion));
        } else if (type == OpCode.createTTL) {
            request.setTxn(new CreateTTLTxn(path, data, listACL, newCversion, ttl));
        } else {
            request.setTxn(new CreateTxn(path, data, listACL, createMode.isEphemeral(), newCversion));
        }
    
        TxnHeader hdr = request.getHdr();
        long ephemeralOwner = 0;
        if (createMode.isContainer()) {
            ephemeralOwner = EphemeralType.CONTAINER_EPHEMERAL_OWNER;
        } else if (createMode.isTTL()) {
            ephemeralOwner = EphemeralType.TTL.toEphemeralOwner(ttl);
        } else if (createMode.isEphemeral()) {
            ephemeralOwner = request.sessionId; // 临时节点使用sessionId
        }
        // czxid(created),mzxid(modified),ctime,mtime,version,cversion(childVersion),
        // aversion(aclVersion),ephemeralOwner,pzxid(lastModifiedChildren)
        StatPersisted s = DataTree.createStat(hdr.getZxid(), hdr.getTime(), ephemeralOwner);
    
        // 父节点
        parentRecord = parentRecord.duplicate(request.getHdr().getZxid());
        parentRecord.childCount++;
        parentRecord.stat.setCversion(newCversion);
        parentRecord.stat.setPzxid(request.getHdr().getZxid());
        parentRecord.precalculatedDigest = precalculateDigest(
                DigestOpCode.UPDATE, parentPath, parentRecord.data, parentRecord.stat);
        addChangeRecord(parentRecord);
        // 新增节点
        ChangeRecord nodeRecord = new ChangeRecord(request.getHdr().getZxid(), path, s, 0, listACL);
        nodeRecord.data = data;
        nodeRecord.precalculatedDigest = precalculateDigest(DigestOpCode.ADD, path, nodeRecord.data, s);
        setTxnDigest(request, nodeRecord.precalculatedDigest);
        addChangeRecord(nodeRecord);
    }
    
    protected void addChangeRecord(ChangeRecord c) {
        synchronized (zks.outstandingChanges) {
            zks.outstandingChanges.add(c);
            zks.outstandingChangesForPath.put(c.path, c);
        }
    }
    

    outstandingChanges保存未提交的事务变化,比如在生成顺序节点时需要使用cversion值,但是在事务提交到ZKDatabase之前,库里面的值是旧的,所以在上面的代码中,是从outstandingChanges查找节点,给cversion++后再生成顺序节点。

    在事务提交之后,才会清理outstandingChanges集。

    ProposalRequestProcessor发Proposal

    public void processRequest(Request request) throws RequestProcessorException {
        if (request instanceof LearnerSyncRequest) { // sync命令流程,暂不分析
            zks.getLeader().processSync((LearnerSyncRequest) request);
        } else {
            if (shouldForwardToNextProcessor(request)) {
                nextProcessor.processRequest(request); // 提交给下游处理器
            }
            if (request.getHdr() != null) { // 事务操作需要发proposal并写磁盘
                try {
                    zks.getLeader().propose(request);
                } catch (XidRolloverException e) {
                    throw new RequestProcessorException(e.getMessage(), e);
                }
                // 把事务log写到文件中
                // 之后通过AckRequestProcessor处理器给leader ack
                syncProcessor.processRequest(request);
            }
        }
    }
    

    CommitProcessor提交事务

    public void processRequest(Request request) {
        request.commitProcQueueStartTime = Time.currentElapsedTime();
        queuedRequests.add(request); // 所有请求队列
        if (needCommit(request)) { // 需要提交的请求进入到写队列
            queuedWriteRequests.add(request);
            numWriteQueuedRequests.incrementAndGet();
        } else {
            numReadQueuedRequests.incrementAndGet();
        }
        wakeup();
    }
    

    run方法对比queuedRequests、queuedWriteRequests、committedRequests这几个队列,将提交成功的请求或读请求转发给下游的ToBeAppliedRequestProcessor处理器。

    FinalRequestProcessor应用事务

    该处理器位于处理器链的末尾,负责将事务应用到ZKDatabase、查询数据、返回响应。

    applyRequest

    该方法将事务应用到ZKDatabase中:

    private ProcessTxnResult applyRequest(Request request) {
        // 应用事务
        ProcessTxnResult rc = zks.processTxn(request);
    
        // closeSession
    
        // metrics
    
        return rc;
    }
    

    zks.processTxn负责处理session、处理事务、清理outstandingChanges集。重点看一下处理事务的步骤。

    processTxn

    public ProcessTxnResult processTxn(TxnHeader header, Record txn, boolean isSubTxn) {
        ProcessTxnResult rc = new ProcessTxnResult();
    
        try {
            rc.clientId = header.getClientId();
            rc.cxid = header.getCxid();
            rc.zxid = header.getZxid();
            rc.type = header.getType();
            rc.err = 0;
            rc.multiResult = null;
            switch (header.getType()) {
            case OpCode.create:
                CreateTxn createTxn = (CreateTxn) txn;
                rc.path = createTxn.getPath();
                createNode(
                    createTxn.getPath(),
                    createTxn.getData(),
                    createTxn.getAcl(),
                    createTxn.getEphemeral() ? header.getClientId() : 0,
                    createTxn.getParentCVersion(),
                    header.getZxid(),
                    header.getTime(),
                    null);
                break;
            case OpCode.create2:
                CreateTxn create2Txn = (CreateTxn) txn;
                rc.path = create2Txn.getPath();
                Stat stat = new Stat();
                createNode(
                    create2Txn.getPath(),
                    create2Txn.getData(),
                    create2Txn.getAcl(),
                    create2Txn.getEphemeral() ? header.getClientId() : 0,
                    create2Txn.getParentCVersion(),
                    header.getZxid(),
                    header.getTime(),
                    stat);
                rc.stat = stat;
                break;
            case OpCode.createTTL:
                CreateTTLTxn createTtlTxn = (CreateTTLTxn) txn;
                rc.path = createTtlTxn.getPath();
                stat = new Stat();
                createNode(
                    createTtlTxn.getPath(),
                    createTtlTxn.getData(),
                    createTtlTxn.getAcl(),
                    EphemeralType.TTL.toEphemeralOwner(createTtlTxn.getTtl()),
                    createTtlTxn.getParentCVersion(),
                    header.getZxid(),
                    header.getTime(),
                    stat);
                rc.stat = stat;
                break;
            case OpCode.createContainer:
                CreateContainerTxn createContainerTxn = (CreateContainerTxn) txn;
                rc.path = createContainerTxn.getPath();
                stat = new Stat();
                createNode(
                    createContainerTxn.getPath(),
                    createContainerTxn.getData(),
                    createContainerTxn.getAcl(),
                    EphemeralType.CONTAINER_EPHEMERAL_OWNER,
                    createContainerTxn.getParentCVersion(),
                    header.getZxid(),
                    header.getTime(),
                    stat);
                rc.stat = stat;
                break;
            // ...
            }
        }
        // ...
    }
    

    createNode

    public void createNode(final String path, byte[] data, List acl, long ephemeralOwner, int parentCVersion, long zxid, long time, Stat outputStat) throws NoNodeException, NodeExistsException {
        int lastSlash = path.lastIndexOf('/');
        String parentName = path.substring(0, lastSlash);
        String childName = path.substring(lastSlash + 1);
        StatPersisted stat = createStat(zxid, time, ephemeralOwner);
        DataNode parent = nodes.get(parentName); // 父节点必须存在
        if (parent == null) {
            throw new NoNodeException();
        }
        synchronized (parent) {
            Long acls = aclCache.convertAcls(acl);
    
            Set children = parent.getChildren();
            if (children.contains(childName)) { // 节点不能存在
                throw new NodeExistsException();
            }
    
            nodes.preChange(parentName, parent);
            if (parentCVersion == -1) { // childVersion++
                parentCVersion = parent.stat.getCversion();
                parentCVersion++;
            }
    
            if (parentCVersion > parent.stat.getCversion()) {
                parent.stat.setCversion(parentCVersion);
                parent.stat.setPzxid(zxid);
            }
            // 创建node
            DataNode child = new DataNode(data, acls, stat);
            parent.addChild(childName);
            nodes.postChange(parentName, parent);
            nodeDataSize.addAndGet(getNodeSize(path, child.data));
            nodes.put(path, child); // 维护NodeHashMap
            // 处理临时节点
            EphemeralType ephemeralType = EphemeralType.get(ephemeralOwner);
            if (ephemeralType == EphemeralType.CONTAINER) {
                containers.add(path);
            } else if (ephemeralType == EphemeralType.TTL) {
                ttls.add(path);
            } else if (ephemeralOwner != 0) {
                HashSet list = ephemerals.computeIfAbsent(ephemeralOwner, k -> new HashSet<>());
                synchronized (list) {
                    list.add(path);
                }
            }
            // 返回节点stat
            if (outputStat != null) {
                child.copyStat(outputStat);
            }
        }
        // now check if its one of the zookeeper node child 略
    
        // 触发NodeCreated监听
        dataWatches.triggerWatch(path, Event.EventType.NodeCreated, zxid);
        // 触发父节点的NodeChildrenChanged监听
        childWatches.triggerWatch(
            parentName.equals("") ? "/" : parentName, Event.EventType.NodeChildrenChanged, zxid);
    }
    

    返回响应

    case OpCode.create: {
        lastOp = "CREA";
        rsp = new CreateResponse(rc.path); // 创建Response
        err = Code.get(rc.err); // processTxn的err
        requestPathMetricsCollector.registerRequest(request.type, rc.path);
        break;
    }
    case OpCode.create2:
    case OpCode.createTTL:
    case OpCode.createContainer: {
        lastOp = "CREA";
        rsp = new Create2Response(rc.path, rc.stat); // 创建Response
        err = Code.get(rc.err); // processTxn的err
        requestPathMetricsCollector.registerRequest(request.type, rc.path);
        break;
    }
    

    最后会使用cnxn把响应返回给客户端:

    ReplyHeader hdr = new ReplyHeader(request.cxid, lastZxid, err.intValue());
    cnxn.sendResponse(hdr, rsp, "response");
    

    EphemeralType

    • VOID
    • NORMAL
    • CONTAINER
    • TTL

    ephemeralOwner标识znode是临时的,以及哪个会话创建了该节点。通过zookeeper.extendedTypesEnabled属性可以启用ttl节点等扩展功能。ephemeralOwner的"特殊位"用于表示启用了哪个功能,而ephemeral Owner的剩余位是特定于功能的。

    当zookeeper.extendedTypesEnabled为true时,将启用扩展类型。扩展ephemeralOwner填充高8位(0xff00000000000000L),高8位之后的两个字节用于表示ephemeralOwner扩展特征,剩余5个字节由该功能指定,可用于所需的任何目的。

    目前,唯一扩展功能是TTL节点,扩展特征值为0。对于TTL节点,ephemeralOwner具有0xff的高8位,接下来2个字节是0,后面的5个字节是以毫秒为单位的ttl值。因此ttl值为1毫秒的ephemeralOwner是0xff00000000000001。

    要添加新的扩展功能:

    • 向枚举添加新名称
    • 在ttl之后,定义常量extended_BIT_xxxx,即0x0001
    • 通过静态初始值设定项向extendedFeatureMap添加映射

    注意:从技术上讲,容器节点也是扩展类型,但由于它是在该功能之前实现的,因此被特别表示。根据定义,只有高位集(0x8000000000000000L)的临时所有者是容器节点(无论是否启用扩展类型)。

    ttl节点

    • 默认不开启,使用
    • Added in 3.5.3
    • 创建PERSISTENT或PERSISTENT_SEQUENTIAL节点时,可以设置以毫秒为单位的ttl。如果znode没有在ttl内修改,并且没有子节点,它将在将来的某个时候成为服务器删除的候选节点

    container节点

    • Added in 3.5.3
    • 当container节点的最后一个子节点被删除时,该container节点将成为服务器在未来某个时候删除的候选节点

    Stat类

    封装节点属性,字段如下:

    • czxid The zxid of the change that caused this znode to be created.
    • mzxid The zxid of the change that last modified this znode.
    • pzxid The zxid of the change that last modified children of this znode.
    • ctime The time in milliseconds from epoch when this znode was created.
    • mtime The time in milliseconds from epoch when this znode was last modified.
    • version The number of changes to the data of this znode.
    • cversion The number of changes to the children of this znode.
    • aversion The number of changes to the ACL of this znode.
    • ephemeralOwner The session id of the owner of this znode if the znode is an ephemeral node. If it is not an ephemeral node, it will be zero.
    • dataLength The length of the data field of this znode.
    • numChildren The number of children of this znode.

    删除node

    涉及delete、deleteContainer等命令。

    PrepRequestProcessor事务准备

    反序列化请求参数

    private void pRequestHelper(Request request) {
        try {
            switch (request.type) {
            // ...
            case OpCode.deleteContainer:
                DeleteContainerRequest deleteContainerRequest =
                    request.readRequestRecord(DeleteContainerRequest::new);
                pRequest2Txn(request.type, zks.getNextZxid(), request, deleteContainerRequest);
                break;
            case OpCode.delete:
                DeleteRequest deleteRequest = request.readRequestRecord(DeleteRequest::new);
                pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest);
                break;
            }
            // ...
        }
    }
    

    DeleteContainerRequest类:

    public class DeleteContainerRequest implements Record {
        private String path;
    }
    

    DeleteRequest类:

    public class DeleteRequest implements Record {
      private String path;
      private int version;
    }
    

    事务准备

    protected void pRequest2Txn(int type, long zxid, Request request,
                 Record record) throws KeeperException, IOException, RequestProcessorException {
        // 略
    
        switch (type) {
        // 略
        case OpCode.deleteContainer: {
            DeleteContainerRequest txn = (DeleteContainerRequest) record;
            String path = txn.getPath();
            String parentPath = getParentPathAndValidate(path);
            ChangeRecord nodeRecord = getRecordForPath(path); // 获取待删除节点
            if (nodeRecord.childCount > 0) { // 有子节点不允许删除
                throw new KeeperException.NotEmptyException(path);
            }
            if (EphemeralType.get(nodeRecord.stat.getEphemeralOwner()) == EphemeralType.NORMAL) {
                throw new KeeperException.BadVersionException(path);
            }
            ChangeRecord parentRecord = getRecordForPath(parentPath); // 获取父节点
            request.setTxn(new DeleteTxn(path));
            // addChangeRecord 略
            break;
        }
        case OpCode.delete:
            zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
            DeleteRequest deleteRequest = (DeleteRequest) record;
            String path = deleteRequest.getPath();
            String parentPath = getParentPathAndValidate(path);
            ChangeRecord parentRecord = getRecordForPath(parentPath); // 获取父节点
            // 检查DELETE权限
            zks.checkACL(request.cnxn, parentRecord.acl, ZooDefs.Perms.DELETE, request.authInfo, path, null);
            ChangeRecord nodeRecord = getRecordForPath(path); // 获取待删除节点
            checkAndIncVersion(nodeRecord.stat.getVersion(), deleteRequest.getVersion(), path); // 检查version
            if (nodeRecord.childCount > 0) { // 有子节点不允许删除
                throw new KeeperException.NotEmptyException(path);
            }
            request.setTxn(new DeleteTxn(path));
            // addChangeRecord 略
            break;
        }
    }
    

    FinalRequestProcessor应用事务

    processTxn

    public ProcessTxnResult processTxn(TxnHeader header, Record txn, boolean isSubTxn) {
        ProcessTxnResult rc = new ProcessTxnResult();
    
        try {
            rc.clientId = header.getClientId();
            rc.cxid = header.getCxid();
            rc.zxid = header.getZxid();
            rc.type = header.getType();
            rc.err = 0;
            rc.multiResult = null;
            switch (header.getType()) {
            // ...
            case OpCode.delete:
            case OpCode.deleteContainer:
                DeleteTxn deleteTxn = (DeleteTxn) txn;
                rc.path = deleteTxn.getPath();
                deleteNode(deleteTxn.getPath(), header.getZxid());
                break;
            }
            // ...
        }
    }
    

    deleteNode

    public void deleteNode(String path, long zxid) throws NoNodeException {
        int lastSlash = path.lastIndexOf('/');
        String parentName = path.substring(0, lastSlash);
        String childName = path.substring(lastSlash + 1);
    
        DataNode parent = nodes.get(parentName);
        if (parent == null) { // 获取父节点且必须存在
            throw new NoNodeException();
        }
        synchronized (parent) {
            nodes.preChange(parentName, parent);
            parent.removeChild(childName);
            if (zxid > parent.stat.getPzxid()) {
                parent.stat.setPzxid(zxid); // The zxid of the change that last modified children of this znode
            }
            nodes.postChange(parentName, parent);
        }
    
        DataNode node = nodes.get(path); // 获取删除节点
        if (node == null) {
            throw new NoNodeException();
        }
        nodes.remove(path); // 从NodeHashMap删除
        synchronized (node) { // 移除权限
            aclCache.removeUsage(node.acl);
            nodeDataSize.addAndGet(-getNodeSize(path, node.data));
        }
    
        // 移除临时节点、container、ttl等缓存
        synchronized (parent) {
            long owner = node.stat.getEphemeralOwner();
            EphemeralType ephemeralType = EphemeralType.get(owner);
            if (ephemeralType == EphemeralType.CONTAINER) {
                containers.remove(path);
            } else if (ephemeralType == EphemeralType.TTL) {
                ttls.remove(path);
            } else if (owner != 0) {
                Set nodes = ephemerals.get(owner);
                if (nodes != null) {
                    synchronized (nodes) {
                        nodes.remove(path);
                    }
                }
            }
        }
    
        // 略
    
        // 触发NodeDeleted监听
        WatcherOrBitSet processed = dataWatches.triggerWatch(path, EventType.NodeDeleted, zxid);
        childWatches.triggerWatch(path, EventType.NodeDeleted, zxid, processed);
        // 触发父节点的NodeChildrenChanged监听
        childWatches.triggerWatch(
            "".equals(parentName) ? "/" : parentName, EventType.NodeChildrenChanged, zxid);
    }
    

    设置node数据

    PrepRequestProcessor事务准备

    反序列化请求参数

    private void pRequestHelper(Request request) {
        try {
            switch (request.type) {
            // ...
            case OpCode.setData:
                SetDataRequest setDataRequest = request.readRequestRecord(SetDataRequest::new);
                pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest);
                break;
            // other case
            }
        }
        // ...
    }
    

    SetDataRequest类:

    public class SetDataRequest implements Record {
      private String path;
      private byte[] data;
      private int version;
    }
    

    事务准备

    protected void pRequest2Txn(int type, long zxid, Request request,
              Record record) throws KeeperException, IOException, RequestProcessorException {
        // 略
    
        switch (type) {
        // ...
        case OpCode.setData:
            zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
            SetDataRequest setDataRequest = (SetDataRequest) record;
            path = setDataRequest.getPath();
            validatePath(path, request.sessionId);
            nodeRecord = getRecordForPath(path); // 获取节点对象
            // 检查权限
            zks.checkACL(request.cnxn, nodeRecord.acl, ZooDefs.Perms.WRITE, request.authInfo, path, null);
            // 检查字节限额
            zks.checkQuota(path, nodeRecord.data, setDataRequest.getData(), OpCode.setData);
            // version++
            int newVersion = checkAndIncVersion(
                nodeRecord.stat.getVersion(), setDataRequest.getVersion(), path);
            // 创建SetDataTxn
            request.setTxn(new SetDataTxn(path, setDataRequest.getData(), newVersion));
            // addChangeRecord
            break;
        // other case
        }
    }
    

    FinalRequestProcessor应用事务

    processTxn

    public ProcessTxnResult processTxn(TxnHeader header, Record txn, boolean isSubTxn) {
        ProcessTxnResult rc = new ProcessTxnResult();
    
        try {
            rc.clientId = header.getClientId();
            rc.cxid = header.getCxid();
            rc.zxid = header.getZxid();
            rc.type = header.getType();
            rc.err = 0;
            rc.multiResult = null;
            switch (header.getType()) {
            // other case
            case OpCode.setData:
                SetDataTxn setDataTxn = (SetDataTxn) txn;
                rc.path = setDataTxn.getPath();
                rc.stat = setData(
                    setDataTxn.getPath(),
                    setDataTxn.getData(),
                    setDataTxn.getVersion(),
                    header.getZxid(),
                    header.getTime());
                break;
            // other case
            }
        }
        // ...
    }
    

    setData

    public Stat setData(String path, byte[] data, int version,
                        long zxid, long time) throws NoNodeException {
        Stat s = new Stat();
        DataNode n = nodes.get(path);
        if (n == null) { // 检查节点存在
            throw new NoNodeException();
        }
        byte[] lastData;
        synchronized (n) {
            lastData = n.data;
            nodes.preChange(path, n);
            n.data = data; // 节点数据
            n.stat.setMtime(time); // 修改时间
            n.stat.setMzxid(zxid); // 修改zxid
            n.stat.setVersion(version); // 版本
            n.copyStat(s);
            nodes.postChange(path, n);
        }
    
        // 略
    
        // 触发NodeDataChanged监听
        dataWatches.triggerWatch(path, EventType.NodeDataChanged, zxid);
        return s;
    }
    

    查询node数据

    PrepRequestProcessor验证session

    经过该处理器时,只做session验证。

    之后的ProposalRequestProcessor、CommitProcessor、ToBeAppliedRequestProcessor都是直接通过,不做事务处理,直接交给FinalRequestProcessor处理器查询数据、发送响应。

    FinalRequestProcessor查询数据

    使用handleGetDataRequest方法查询数据:

    private Record handleGetDataRequest(
            Record request, ServerCnxn cnxn, List authInfo) throws KeeperException, IOException {
        GetDataRequest getDataRequest = (GetDataRequest) request;
        String path = getDataRequest.getPath();
        DataNode n = zks.getZKDatabase().getNode(path);
        if (n == null) {
            throw new KeeperException.NoNodeException();
        }
        // 检查权限
        zks.checkACL(cnxn, zks.getZKDatabase().aclForNode(n), ZooDefs.Perms.READ, authInfo, path, null);
        Stat stat = new Stat();
        // 查询数据
        // 如果watcher参数不为null会给path添加一个监听器
        byte[] b = zks.getZKDatabase().getData(path, stat, getDataRequest.getWatch() ? cnxn : null);
        return new GetDataResponse(b, stat);
    }
    

    GetDataRequest类:

    public class GetDataRequest implements Record {
      private String path;
      private boolean watch;
    }
    

    节点监听

    addWatch命令

    case OpCode.addWatch: {
        lastOp = "ADDW";
        AddWatchRequest addWatcherRequest = request.readRequestRecord(AddWatchRequest::new);
        // 最终使用DataTree的addWatch方法注册监听器
        // cnxn是ServerCnxn对象,实现了Watcher接口
        zks.getZKDatabase().addWatch(addWatcherRequest.getPath(), cnxn, addWatcherRequest.getMode());
        rsp = new ErrorResponse(0);
        break;
    }
    

    DataTree的addWatch方法

    public void addWatch(String basePath, Watcher watcher, int mode) {
        // PERSISTENT|PERSISTENT_RECURSIVE
        WatcherMode watcherMode = WatcherMode.fromZooDef(mode);
        // dataWatches和childWatches是WatchManager类型对象
        dataWatches.addWatch(basePath, watcher, watcherMode);
        if (watcherMode != WatcherMode.PERSISTENT_RECURSIVE) {
            childWatches.addWatch(basePath, watcher, watcherMode);
        }
    }
    

    WatcherMode枚举

    public enum WatcherMode {
        STANDARD(false, false),
        PERSISTENT(true, false), // persistent=0
        PERSISTENT_RECURSIVE(true, true), // persistentRecursive=1
        ;
    }
    

    PERSISTENT和PERSISTENT_RECURSIVE是3.6.0版本新增的特性。

    Watcher接口

    实现类需要实现process方法:

    void process(WatchedEvent event);
    

    WatchedEvent代表一个监听事件:

    public class WatchedEvent {
        // 当前zk服务器的状态
        private final KeeperState keeperState;
        // NodeCreated|NodeDeleted|NodeDataChanged|NodeChildrenChanged等
        private final EventType eventType;
        private final String path;
        private final long zxid;
    }
    

    重要的实现类:

    • NIOServerCnxn
    • NettyServerCnxn

    WatchManager类

    This class manages watches. It allows watches to be associated with a string and removes watchers and their watches in addition to managing triggers.

    核心字段:

    // path -> Watcher集
    private final Map> watchTable = new HashMap<>();
    // Watcher -> path->WatchStats(PERSISTENT|STANDARD + PERSISTENT|PERSISTENT_RECURSIVE等)
    private final Map> watch2Paths = new HashMap<>();
    

    WatchStats类

    public final class WatchStats {
        private static final WatchStats[] WATCH_STATS = new WatchStats[] {
                new WatchStats(0), // NONE
                new WatchStats(1), // STANDARD
                new WatchStats(2), // PERSISTENT
                new WatchStats(3), // STANDARD + PERSISTENT
                new WatchStats(4), // PERSISTENT_RECURSIVE
                new WatchStats(5), // STANDARD + PERSISTENT_RECURSIVE
                new WatchStats(6), // PERSISTENT + PERSISTENT_RECURSIVE
                new WatchStats(7), // STANDARD + PERSISTENT + PERSISTENT_RECURSIVE
        };
    
        public static final WatchStats NONE = WATCH_STATS[0];
    
        private final int flags;
    
        private WatchStats(int flags) {
            this.flags = flags;
        }
    
        private static int modeToFlag(WatcherMode mode) {
            // mode = STANDARD; return 1 << 0 = 1(0001)
            // mode = PERSISTENT; return 1 << 1 = 2(0010)
            // mode = PERSISTENT_RECURSIVE; return 1 << 2 = (0100)
            return 1 << mode.ordinal();
        }
    
        public WatchStats addMode(WatcherMode mode) {
            int flags = this.flags | modeToFlag(mode); // |计算保留多种状态
            return WATCH_STATS[flags];
        }
    
        public WatchStats removeMode(WatcherMode mode) {
            int mask = ~modeToFlag(mode); // 取反
            int flags = this.flags & mask;
            if (flags == 0) {
                return NONE;
            }
            return WATCH_STATS[flags];
        }
    
        // ...
    }
    

    addWatch

    public synchronized boolean addWatch(String path, Watcher watcher, WatcherMode watcherMode) {
        Set list = watchTable.get(path);
        if (list == null) {
            list = new HashSet<>(4);
            watchTable.put(path, list);
        }
        list.add(watcher); // 添加watchTable
    
        Map paths = watch2Paths.get(watcher);
        if (paths == null) {
            // cnxns typically have many watches, so use default cap here
            paths = new HashMap<>();
            watch2Paths.put(watcher, paths);
        }
    
        WatchStats stats = paths.getOrDefault(path, WatchStats.NONE);
        WatchStats newStats = stats.addMode(watcherMode);
    
        if (newStats != stats) {
            paths.put(path, newStats);
            if (watcherMode.isRecursive()) {
                ++recursiveWatchQty;
            }
            return true;
        }
    
        return false;
    }
    

    triggerWatch

    public WatcherOrBitSet triggerWatch(String path, EventType type, long zxid, WatcherOrBitSet supress) {
        WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path, zxid);
        Set watchers = new HashSet<>();
        synchronized (this) {
            // path迭代器,从子节点path向前遍历
            // 例如/apps/app1/name
            // next = /apps/app1/name, next = /apps/app1, next = /apps ...
            PathParentIterator pathParentIterator = getPathParentIterator(path);
            for (String localPath : pathParentIterator.asIterable()) {
                // 获取遍历Watcher集
                Set thisWatchers = watchTable.get(localPath);
                Iterator iterator = thisWatchers.iterator(); 
                while (iterator.hasNext()) {
                    Watcher watcher = iterator.next();
                    // 获取watcher对应的WatchStats
                    Map paths = watch2Paths.getOrDefault(watcher, Collections.emptyMap());
                    WatchStats stats = paths.get(localPath); // if stats==null continue
                    if (!pathParentIterator.atParentPath()) {
                        watchers.add(watcher); // 加入watchers中
                        WatchStats newStats = stats.removeMode(WatcherMode.STANDARD);
                        if (newStats == WatchStats.NONE) { // STANDARD模式下会移除监听器
                            iterator.remove();
                            paths.remove(localPath);
                        } else if (newStats != stats) {
                            paths.put(localPath, newStats);
                        }
                    } else if (stats.hasMode(WatcherMode.PERSISTENT_RECURSIVE)) {
                        // 递归模式下才将父节点加入watchers中
                        watchers.add(watcher);
                    }
                }
                if (thisWatchers.isEmpty()) {
                    watchTable.remove(localPath);
                }
            }
        }
        // 略
    
        for (Watcher w : watchers) {
            if (supress != null && supress.contains(w)) {
                continue;
            }
            w.process(e);
        }
    
        // 略
    
        return new WatcherOrBitSet(watchers);
    }
    

    NIOServerCnxn

    上面查找到watchers之后会触发process方法,看一下NIOServerCnxn的方法实现:

    public void process(WatchedEvent event) {
        ReplyHeader h = new ReplyHeader(ClientCnxn.NOTIFICATION_XID, event.getZxid(), 0);
    
        // 转型成WatcherEvent才能通过网络传输
        WatcherEvent e = event.getWrapper();
        // 把事件推送给客户端
        int responseSize = sendResponse(h, e, "notification", null, null, ZooDefs.OpCode.error);
        ServerMetrics.getMetrics().WATCH_BYTES.add(responseSize);
    }
    
  • 相关阅读:
    STL源码阅读笔记-内存空间的分配和回收
    ai智能生成文章-智能生成文章软件
    android studio 自带模拟器进行 Root 及 Xposed安装
    给 spyter/all-spark-notebook 添加scala支持
    神经网络照片解读下载,神经网络识别图像原理
    【STM32】存储器和位带映射(bit band mapping)
    数量关系(蒙题技巧)
    树莓派4B-高精度驱动步进电机
    C++中string的用法总结+底层剖析
    prtorch.数据的导入与导出
  • 原文地址:https://www.cnblogs.com/xugf/p/18065356