• zookeeper源码(05)数据存储


    本文详细分析一下zookeeper的数据存储。

    ZKDatabase

    维护zookeeper服务器内存数据库,包括session、dataTree和committedlog数据,从磁盘读取日志和快照后启动。

    关键字段

    // 数据节点树
    protected DataTree dataTree;
    protected ConcurrentHashMap sessionsWithTimeouts;
    protected FileTxnSnapLog snapLog; // 用于操作底层数据文件
    // committedLog中第一条和最后一条数据的zxid
    protected long minCommittedLog, maxCommittedLog;
    // committedLog最大容量,默认500
    public int commitLogCount;
    // 维护最后提交的请求集,可用于快速follower同步
    protected Queue committedLog = new ArrayDeque<>();
    
    protected ReentrantReadWriteLock logLock = new ReentrantReadWriteLock();
    private volatile boolean initialized = false;
    
    // txnlog计数
    private AtomicInteger txnCount = new AtomicInteger(0);
    

    构造方法

    public ZKDatabase(FileTxnSnapLog snapLog) {
        dataTree = createDataTree();
        sessionsWithTimeouts = new ConcurrentHashMap<>();
        this.snapLog = snapLog;
    
        // 初始化snapshotSizeFactor默认0.33
        // 初始化commitLogCount默认500
    }
    
    public DataTree createDataTree() {
        return new DataTree();
    }
    

    创建DataTree对象:创建/zookeeper/quota、/zookeeper/config节点,创建dataWatches和childWatches对象(使用WatchManager实现类)。

    主要方法

    // 返回committedLog集
    public synchronized Collection getCommittedLog();
    // 返回dataTree.lastProcessedZxid的值
    public long getDataTreeLastProcessedZxid();
    // 返回dataTree.getSessions()集
    public Collection getSessions();
    // 返回sessionsWithTimeouts的size
    public long getSessionCount();
    // 从磁盘加载dataTree并把txnLog加载到committedLog中
    public long loadDataBase() throws IOException;
    // 从磁盘加载txnLog到committedLog中
    public long fastForwardDataBase() throws IOException;
    // 使用addCommittedProposal方法添加committedLog
    private void addCommittedProposal(TxnHeader hdr, Record txn, TxnDigest digest);
    // 添加committedLog
    public void addCommittedProposal(Request request);
    // 从txnLog加载Proposal
    public Iterator getProposalsFromTxnLog(long startZxid, long sizeLimit);
    // 使用dataTree.removeCnxn(cnxn)
    public void removeCnxn(ServerCnxn cnxn);
    // 使用dataTree.killSession(sessionId, zxid)
    public void killSession(long sessionId, long zxid);
    // 使用dataTree.dumpEphemerals(pwriter)
    public void dumpEphemerals(PrintWriter pwriter);
    // 使用dataTree.getEphemerals()
    public Map> getEphemerals();
    // 使用dataTree.getNodeCount()
    public int getNodeCount();
    // 使用dataTree.getEphemerals(sessionId)
    public Set getEphemerals(long sessionId);
    // 给dataTree.lastProcessedZxid赋值
    public void setlastProcessedZxid(long zxid);
    // 使用dataTree.processTxn(hdr, txn, digest)
    public ProcessTxnResult processTxn(TxnHeader hdr, Record txn, TxnDigest digest);
    // 使用dataTree.statNode(path, serverCnxn)
    public Stat statNode(String path, ServerCnxn serverCnxn) throws KeeperException.NoNodeException;
    // 使用dataTree.getNode(path)
    public DataNode getNode(String path);
    // 使用dataTree.getData(path, stat, watcher)
    public byte[] getData(String path, Stat stat, Watcher watcher) throws KeeperException.NoNodeException;
    // 使用dataTree.setWatches方法实现
    public void setWatches(long relativeZxid, List dataWatches,
                           List existWatches, List childWatches,
                           List persistentWatches, List persistentRecursiveWatches,
                           Watcher watcher);
    // 使用dataTree.addWatch(basePath, watcher, mode)
    public void addWatch(String basePath, Watcher watcher, int mode);
    // 使用dataTree.getChildren(path, stat, watcher)
    public List getChildren(
        String path, Stat stat, Watcher watcher) throws KeeperException.NoNodeException;
    // 使用dataTree.getAllChildrenNumber(path)
    public int getAllChildrenNumber(String path) throws KeeperException.NoNodeException;
    // Truncate the ZKDatabase to the specified zxid
    public boolean truncateLog(long zxid) throws IOException;
    // Deserialize a snapshot from an input archive
    public void deserializeSnapshot(InputArchive ia) throws IOException;
    // Deserialize a snapshot that contains FileHeader from an input archive
    // It is used by the admin restore command
    public void deserializeSnapshot(final InputArchive ia, final CheckedInputStream is) throws IOException;
    // Serialize the snapshot
    public void serializeSnapshot(OutputArchive oa) throws IOException, InterruptedException;
    // 使用snapLog.append(si)保存数据,txnCount++
    public boolean append(Request si) throws IOException;
    // 使用snapLog.rollLog()滚动底层txnLog
    public void rollLog() throws IOException;
    // 使用snapLog.commit()提交底层txnLog
    public void commit() throws IOException;
    // 初始化/zookeeper/config数据,集群启动时已介绍
    public synchronized void initConfigInZKDatabase(QuorumVerifier qv);
    // 使用dataTree.containsWatcher(path, type, watcher)
    public boolean containsWatcher(String path, WatcherType type, Watcher watcher);
    // 使用dataTree.removeWatch(path, type, watcher)
    public boolean removeWatch(String path, WatcherType type, Watcher watcher);
    

    loadDataBase方法

    从磁盘加载dataTree并把txnLog加载到committedLog中:

    public long loadDataBase() throws IOException {
        long startTime = Time.currentElapsedTime();
        // 1. 从snapshot加载dataTree
        // 2. 使用fastForwardFromEdits方法从txnLog加载dataTree和committedlog
        long zxid = snapLog.restore(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener);
        initialized = true;
        // 略
        return zxid;
    }
    

    fastForwardDataBase方法

    从txnLog加载dataTree和committedlog集:

    public long fastForwardDataBase() throws IOException {
        // 会通过commitProposalPlaybackListener调用addCommittedProposal添加committedlog
        long zxid = snapLog.fastForwardFromEdits(
            dataTree, sessionsWithTimeouts, commitProposalPlaybackListener);
        initialized = true;
        return zxid;
    }
    

    addCommittedProposal方法

    private void addCommittedProposal(TxnHeader hdr, Record txn, TxnDigest digest) {
        Request r = new Request(0, hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid());
        r.setTxnDigest(digest);
        addCommittedProposal(r);
    }
    
    public void addCommittedProposal(Request request) {
        WriteLock wl = logLock.writeLock();
        try {
            wl.lock();
            if (committedLog.size() > commitLogCount) {
                committedLog.remove();
                minCommittedLog = committedLog.peek().packet.getZxid();
            }
            if (committedLog.isEmpty()) {
                minCommittedLog = request.zxid;
                maxCommittedLog = request.zxid;
            }
            byte[] data = request.getSerializeData();
            QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, data, null);
            Proposal p = new Proposal();
            p.packet = pp;
            p.request = request;
            committedLog.add(p);
            maxCommittedLog = p.packet.getZxid();
        } finally {
            wl.unlock();
        }
    }
    

    getProposalsFromTxnLog方法

    从txnlog获取Proposal,只填充packet字段:

    public Iterator getProposalsFromTxnLog(long startZxid, long sizeLimit) {
        if (sizeLimit < 0) {
            return TxnLogProposalIterator.EMPTY_ITERATOR;
        }
    
        TxnIterator itr = null;
        try {
            // 从txnLog文件读取数据
            // 底层通过FileTxnIterator类读取文件流实现
            itr = snapLog.readTxnLog(startZxid, false);
    
            // If we cannot guarantee that this is strictly the starting txn
            // after a given zxid, we should fail.
            if ((itr.getHeader() != null) && (itr.getHeader().getZxid() > startZxid)) {
                itr.close();
                return TxnLogProposalIterator.EMPTY_ITERATOR;
            }
    
            if (sizeLimit > 0) {
                long txnSize = itr.getStorageSize();
                if (txnSize > sizeLimit) {
                    itr.close();
                    return TxnLogProposalIterator.EMPTY_ITERATOR;
                }
            }
        } catch (IOException e) {
            itr.close();
            return TxnLogProposalIterator.EMPTY_ITERATOR;
        }
        return new TxnLogProposalIterator(itr);
    }
    

    truncateLog方法

    把txnlog数据truncate到指定的zxid位置,然后重新加载DataTree数据:

    public boolean truncateLog(long zxid) throws IOException {
        clear();
    
        // truncate the log
        boolean truncated = snapLog.truncateLog(zxid);
    
        if (!truncated) {
            return false;
        }
    
        loadDataBase();
        return true;
    }
    

    deserializeSnapshot方法

    public void deserializeSnapshot(InputArchive ia) throws IOException {
        clear();
        SerializeUtils.deserializeSnapshot(getDataTree(), ia, getSessionWithTimeOuts());
        initialized = true;
    }
    
    public void deserializeSnapshot(final InputArchive ia, final CheckedInputStream is) throws IOException {
        clear();
    
        // deserialize data tree
        final DataTree dataTree = getDataTree();
        FileSnap.deserialize(dataTree, getSessionWithTimeOuts(), ia);
        SnapStream.checkSealIntegrity(is, ia);
    
        // deserialize digest and check integrity
        if (dataTree.deserializeZxidDigest(ia, 0)) {
            SnapStream.checkSealIntegrity(is, ia);
        }
    
        // deserialize lastProcessedZxid and check integrity
        if (dataTree.deserializeLastProcessedZxid(ia)) {
            SnapStream.checkSealIntegrity(is, ia);
        }
    
        // compare the digest to find inconsistency
        if (dataTree.getDigestFromLoadedSnapshot() != null) {
            dataTree.compareSnapshotDigests(dataTree.lastProcessedZxid);
        }
    
        initialized = true;
    }
    

    serializeSnapshot方法

    public void serializeSnapshot(OutputArchive oa) throws IOException, InterruptedException {
        SerializeUtils.serializeSnapshot(getDataTree(), oa, getSessionWithTimeOuts());
    }
    

    DataTree

    维护树状结构,没有任何网络或客户端连接代码,因此可以以独立的方式进行测试。

    维护两个并行的数据结构:一个从完整路径映射到DataNodes的哈希表和一个DataNodes树,对路径的所有访问都是通过哈希表进行的,只有在序列化到磁盘时才遍历DataNodes树。

    关键字段

    // This map provides a fast lookup to the data nodes
    private final NodeHashMap nodes;
    // Watcher
    private IWatchManager dataWatches;
    private IWatchManager childWatches;
    // cached total size of paths and data for all DataNodes
    private final AtomicLong nodeDataSize = new AtomicLong(0);
    // This hashtable lists the paths of the ephemeral nodes of a session
    private final Map> ephemerals = new ConcurrentHashMap<>();
    // This set contains the paths of all container nodes
    private final Set containers = Collections.newSetFromMap(new ConcurrentHashMap<>());
    // This set contains the paths of all ttl nodes
    private final Set ttls = Collections.newSetFromMap(new ConcurrentHashMap<>());
    // This is a pointer to the root of the DataTree
    private DataNode root = new DataNode(new byte[0], -1L, new StatPersisted());
    // create a /zookeeper filesystem that is the proc filesystem of zookeeper
    private final DataNode procDataNode = new DataNode(new byte[0], -1L, new StatPersisted());
    // create a /zookeeper/quota node for maintaining quota properties for zookeeper
    private final DataNode quotaDataNode = new DataNode(new byte[0], -1L, new StatPersisted());
    // 最新被处理的zxid
    public volatile long lastProcessedZxid = 0;
    
    • NodeHashMap - NodeHashMapImpl实现类使用ConcurrentHashMap保存path -> DataNode数据
    • IWatchManager和Watcher - 监听器管理
    • DataNode - 封装树节点信息,包括data、children、stat等

    构造方法

    DataTree(DigestCalculator digestCalculator) {
        this.digestCalculator = digestCalculator;
        nodes = new NodeHashMapImpl(digestCalculator);
    
        // rather than fight it, let root have an alias
        nodes.put("", root); // "" -> root
        nodes.putWithoutDigest(rootZookeeper, root); // "/" -> root
    
        // add the proc node and quota node
        root.addChild(procChildZookeeper); // 添加zookeeper子节点
        nodes.put(procZookeeper, procDataNode); // "/zookeeper" -> procDataNode
    
        procDataNode.addChild(quotaChildZookeeper); // 添加quota子节点
        nodes.put(quotaZookeeper, quotaDataNode); // "/zookeeper/quota" -> quotaDataNode
    
        addConfigNode(); // 添加/zookeeper/config节点
    
        nodeDataSize.set(approximateDataSize());
        try {
            // 使用WatchManager实现类
            dataWatches = WatchManagerFactory.createWatchManager();
            childWatches = WatchManagerFactory.createWatchManager();
        } catch (Exception e) {}
    }
    
    public void addConfigNode() {
        DataNode zookeeperZnode = nodes.get(procZookeeper); // 找到/zookeeper节点
        if (zookeeperZnode != null) {
            zookeeperZnode.addChild(configChildZookeeper); // 添加config子节点
        }
    
        nodes.put(configZookeeper, new DataNode(new byte[0], -1L, new StatPersisted()));
        try {
            // Reconfig node is access controlled by default (ZOOKEEPER-2014).
            setACL(configZookeeper, ZooDefs.Ids.READ_ACL_UNSAFE, -1);
        } catch (NoNodeException e) {}
    }
    

    主要方法

    // Add a new node to the DataTree
    public void createNode(final String path, byte[] data, List acl, long ephemeralOwner,
                           int parentCVersion, long zxid, long time, Stat outputStat);
    // Remove path from the DataTree
    public void deleteNode(String path, long zxid);
    // 为节点设置数据
    public Stat setData(String path, byte[] data, int version, long zxid, long time);
    // 1. 获取path的data
    // 2. 如果watcher不为null则addWatch
    public byte[] getData(String path, Stat stat, Watcher watcher);
    // 使用node.copyStat(stat)保存stat数据
    public Stat statNode(String path, Watcher watcher);
    
    // 1. copyStat到stat中
    // 2. addWatch
    // 3. getChildren
    public List getChildren(String path, Stat stat, Watcher watcher);
    
    // 设置、获取权限
    public Stat setACL(String path, List acl, int version);
    public List getACL(String path, Stat stat);
    public List getACL(DataNode node);
    // 添加Watcher
    public void addWatch(String basePath, Watcher watcher, int mode);
    
    // 处理事务请求
    public ProcessTxnResult processTxn(TxnHeader header, Record txn, boolean isSubTxn);
    
    // 杀会话,使用deleteNodes删除paths2DeleteLocal和paths2DeleteInTxn集
    void killSession(
        long session, long zxid, Set paths2DeleteLocal, List paths2DeleteInTxn);
    // 遍历paths2Delete调用deleteNode方法删除节点
    void deleteNodes(long session, long zxid, Iterable paths2Delete);
    // 递归方式获取path下面的总节点数和总字节数
    private void getCounts(String path, Counts counts);
    
    // 序列化
    void serializeNode(OutputArchive oa, StringBuilder path);
    public void serializeNodeData(OutputArchive oa, String path, DataNode node);
    public void serializeAcls(OutputArchive oa);
    public void serializeNodes(OutputArchive oa);
    public void serialize(OutputArchive oa, String tag);
    // 反序列化
    public void deserialize(InputArchive ia, String tag);
    // 从dataWatches和childWatches移除watcher
    public void removeCnxn(Watcher watcher);
    // 触发或addWatch
    public void setWatches(long relativeZxid, List dataWatches,
                           List existWatches, List childWatches,
                           List persistentWatches, List persistentRecursiveWatches,
                           Watcher watcher);
    // 为path设置新的cversion和zxid
    public void setCversionPzxid(String path, int newCversion, long zxid);
    // Add the digest to the historical list, and update the latest zxid digest
    private void logZxidDigest(long zxid, long digest);
    // 序列化、反序列化lastProcessedZxidDigest
    public boolean serializeZxidDigest(OutputArchive oa);
    public boolean deserializeZxidDigest(InputArchive ia, long startZxidOfSnapshot);
    // 序列化、反序列化lastProcessedZxid
    public boolean serializeLastProcessedZxid(final OutputArchive oa);
    public boolean deserializeLastProcessedZxid(final InputArchive ia);
    // Compares the actual tree's digest with that in the snapshot.
    // Resets digestFromLoadedSnapshot after comparison.
    public void compareSnapshotDigests(long zxid);
    // Compares the digest of the tree with the digest present in transaction digest.
    // If there is any error, logs and alerts the watchers.
    public boolean compareDigest(TxnHeader header, Record txn, TxnDigest digest);
    

    createNode方法

    processTxn中会使用该方法创建节点:

    public void createNode(final String path, byte[] data, List acl,
                           long ephemeralOwner, int parentCVersion, long zxid,
                           long time, Stat outputStat) throws NoNodeException, NodeExistsException {
        int lastSlash = path.lastIndexOf('/');
        String parentName = path.substring(0, lastSlash);
        String childName = path.substring(lastSlash + 1);
        StatPersisted stat = createStat(zxid, time, ephemeralOwner); // Create a node stat
        DataNode parent = nodes.get(parentName); // 父节点需要存在
        synchronized (parent) {
            Long acls = aclCache.convertAcls(acl);
    
            Set children = parent.getChildren(); // path节点不能存在
    
            nodes.preChange(parentName, parent); // 执行removeDigest
            if (parentCVersion == -1) {
                parentCVersion = parent.stat.getCversion();
                parentCVersion++; // childVersion递增
            }
    
            if (parentCVersion > parent.stat.getCversion()) {
                parent.stat.setCversion(parentCVersion); // 父节点的childVersion
                parent.stat.setPzxid(zxid); // 父节点processZxid
            }
            DataNode child = new DataNode(data, acls, stat);
            parent.addChild(childName); // 添加节点
            nodes.postChange(parentName, parent);
            nodeDataSize.addAndGet(getNodeSize(path, child.data));
            nodes.put(path, child); // 维护NodeHashMap
            EphemeralType ephemeralType = EphemeralType.get(ephemeralOwner);// 通常是VOID|NORMAL
            if (ephemeralType == EphemeralType.CONTAINER) {
                containers.add(path);
            } else if (ephemeralType == EphemeralType.TTL) {
                ttls.add(path);
            } else if (ephemeralOwner != 0) {
                // 维护临时节点
                HashSet list = ephemerals.computeIfAbsent(ephemeralOwner, k -> new HashSet<>());
                synchronized (list) {
                    list.add(path);
                }
            }
            if (outputStat != null) {
                child.copyStat(outputStat); // 把权限保存到outputStat中
            }
        }
    
        // 略
    
        // 触发监听器
        dataWatches.triggerWatch(path, Event.EventType.NodeCreated, zxid);
        childWatches.triggerWatch(
            parentName.equals("") ? "/" : parentName, Event.EventType.NodeChildrenChanged, zxid);
    }
    

    deleteNode方法

    public void deleteNode(String path, long zxid) throws NoNodeException {
        int lastSlash = path.lastIndexOf('/');
        String parentName = path.substring(0, lastSlash);
        String childName = path.substring(lastSlash + 1);
    
        DataNode parent = nodes.get(parentName); // 父节点要存在
        synchronized (parent) {
            nodes.preChange(parentName, parent);
            parent.removeChild(childName); // 移除子节点
            if (zxid > parent.stat.getPzxid()) {
                parent.stat.setPzxid(zxid);
            }
            nodes.postChange(parentName, parent);
        }
    
        DataNode node = nodes.get(path); // 节点要存在
        nodes.remove(path); // 从NodeHashMap移除
        synchronized (node) {
            aclCache.removeUsage(node.acl);
            nodeDataSize.addAndGet(-getNodeSize(path, node.data));
        }
    
        synchronized (parent) {
            long owner = node.stat.getEphemeralOwner();
            EphemeralType ephemeralType = EphemeralType.get(owner);
            if (ephemeralType == EphemeralType.CONTAINER) {
                containers.remove(path);
            } else if (ephemeralType == EphemeralType.TTL) {
                ttls.remove(path);
            } else if (owner != 0) { // 移除临时节点
                Set nodes = ephemerals.get(owner);
                if (nodes != null) {
                    synchronized (nodes) {
                        nodes.remove(path);
                    }
                }
            }
        }
    
        // 略
    
        // 触发监听器
        WatcherOrBitSet processed = dataWatches.triggerWatch(path, EventType.NodeDeleted, zxid);
        childWatches.triggerWatch(path, EventType.NodeDeleted, zxid, processed);
        childWatches.triggerWatch(
            "".equals(parentName) ? "/" : parentName, EventType.NodeChildrenChanged, zxid);
    }
    

    setData方法

    public Stat setData(String path, byte[] data, int version, long zxid, long time) throws NoNodeException {
        Stat s = new Stat();
        DataNode n = nodes.get(path); // 节点要存在
        byte[] lastData;
        synchronized (n) {
            lastData = n.data;
            nodes.preChange(path, n);
            n.data = data; // data赋值
            n.stat.setMtime(time); // 修改时间
            n.stat.setMzxid(zxid); // 修改的zxid
            n.stat.setVersion(version); // 版本
            n.copyStat(s); // 保存stat
            nodes.postChange(path, n);
        }
    
        // 略
        // 触发监听器
        dataWatches.triggerWatch(path, EventType.NodeDataChanged, zxid);
        return s;
    }
    

    setAcl等acl方法

    public Stat setACL(String path, List acl, int version) throws NoNodeException {
        DataNode n = nodes.get(path);
        synchronized (n) {
            Stat stat = new Stat();
            aclCache.removeUsage(n.acl);
            nodes.preChange(path, n);
            n.stat.setAversion(version); // access时间
            n.acl = aclCache.convertAcls(acl); // 设置权限
            n.copyStat(stat);
            nodes.postChange(path, n);
            return stat;
        }
    }
    
    public List getACL(String path, Stat stat) throws NoNodeException {
        DataNode n = nodes.get(path);
        synchronized (n) {
            if (stat != null) {
                n.copyStat(stat);
            }
            return new ArrayList<>(aclCache.convertLong(n.acl));
        }
    }
    
    public List getACL(DataNode node) {
        synchronized (node) {
            return aclCache.convertLong(node.acl);
        }
    }
    

    addWatch方法

    public void addWatch(String basePath, Watcher watcher, int mode) {
        WatcherMode watcherMode = WatcherMode.fromZooDef(mode); // PERSISTENT_RECURSIVE or PERSISTENT
        dataWatches.addWatch(basePath, watcher, watcherMode); // 只给节点添加Watcher
        if (watcherMode != WatcherMode.PERSISTENT_RECURSIVE) {
            childWatches.addWatch(basePath, watcher, watcherMode); // 递归添加Watcher
        }
    }
    

    processTxn方法

    public ProcessTxnResult processTxn(TxnHeader header, Record txn, boolean isSubTxn) {
        ProcessTxnResult rc = new ProcessTxnResult();
    
        try {
            rc.clientId = header.getClientId();
            rc.cxid = header.getCxid();
            rc.zxid = header.getZxid();
            rc.type = header.getType();
            rc.err = 0;
            rc.multiResult = null;
            switch (header.getType()) {
            case OpCode.create: // 创建节点
                CreateTxn createTxn = (CreateTxn) txn;
                rc.path = createTxn.getPath();
                createNode(createTxn.getPath(), createTxn.getData(), createTxn.getAcl(),
                    createTxn.getEphemeral() ? header.getClientId() : 0, createTxn.getParentCVersion(),
                    header.getZxid(), header.getTime(), null);
                break;
            case OpCode.create2: // 创建节点并保存stat
                CreateTxn create2Txn = (CreateTxn) txn;
                rc.path = create2Txn.getPath();
                Stat stat = new Stat();
                createNode(create2Txn.getPath(), create2Txn.getData(), create2Txn.getAcl(),
                    create2Txn.getEphemeral() ? header.getClientId() : 0, create2Txn.getParentCVersion(),
                    header.getZxid(), header.getTime(), stat);
                rc.stat = stat;
                break;
            case OpCode.createTTL:
                CreateTTLTxn createTtlTxn = (CreateTTLTxn) txn;
                rc.path = createTtlTxn.getPath();
                stat = new Stat();
                createNode(createTtlTxn.getPath(), createTtlTxn.getData(), createTtlTxn.getAcl(),
                    EphemeralType.TTL.toEphemeralOwner(createTtlTxn.getTtl()), // ttl
                    createTtlTxn.getParentCVersion(), header.getZxid(), header.getTime(), stat);
                rc.stat = stat;
                break;
            case OpCode.createContainer:
                CreateContainerTxn createContainerTxn = (CreateContainerTxn) txn;
                rc.path = createContainerTxn.getPath();
                stat = new Stat();
                createNode(createContainerTxn.getPath(), createContainerTxn.getData(),
                    createContainerTxn.getAcl(), EphemeralType.CONTAINER_EPHEMERAL_OWNER,
                    createContainerTxn.getParentCVersion(), header.getZxid(), header.getTime(), stat);
                rc.stat = stat;
                break;
            case OpCode.delete:
            case OpCode.deleteContainer:
                DeleteTxn deleteTxn = (DeleteTxn) txn;
                rc.path = deleteTxn.getPath();
                deleteNode(deleteTxn.getPath(), header.getZxid()); // 删除节点
                break;
            case OpCode.reconfig:
            case OpCode.setData: // 设置节点数据
                SetDataTxn setDataTxn = (SetDataTxn) txn;
                rc.path = setDataTxn.getPath();
                rc.stat = setData(setDataTxn.getPath(), setDataTxn.getData(), setDataTxn.getVersion(),
                    header.getZxid(), header.getTime());
                break;
            case OpCode.setACL: // 设置ACL
                SetACLTxn setACLTxn = (SetACLTxn) txn;
                rc.path = setACLTxn.getPath();
                rc.stat = setACL(setACLTxn.getPath(), setACLTxn.getAcl(), setACLTxn.getVersion());
                break;
            case OpCode.closeSession: // 关闭session
                long sessionId = header.getClientId();
                if (txn != null) {
                    killSession(sessionId, header.getZxid(), ephemerals.remove(sessionId),
                            ((CloseSessionTxn) txn).getPaths2Delete());
                } else {
                    killSession(sessionId, header.getZxid());
                }
                break;
            case OpCode.error:
                ErrorTxn errTxn = (ErrorTxn) txn;
                rc.err = errTxn.getErr();
                break;
            case OpCode.check:
                CheckVersionTxn checkTxn = (CheckVersionTxn) txn;
                rc.path = checkTxn.getPath();
                break;
            case OpCode.multi:
                // 遍历处理每一个Txn
                break;
            }
        } catch (KeeperException e) {
            rc.err = e.code().intValue();
        } catch (IOException e) {}
    
        //
        if (header.getType() == OpCode.create && rc.err == Code.NODEEXISTS.intValue()) {
            int lastSlash = rc.path.lastIndexOf('/');
            String parentName = rc.path.substring(0, lastSlash);
            CreateTxn cTxn = (CreateTxn) txn;
            try {
                setCversionPzxid(parentName, cTxn.getParentCVersion(), header.getZxid());
            } catch (NoNodeException e) {
                rc.err = e.code().intValue();
            }
        }
        //
        if (!isSubTxn) {
            if (rc.zxid > lastProcessedZxid) {
                lastProcessedZxid = rc.zxid; // 设置最新lastProcessedZxid
            }
    
            // 略
        }
    
        return rc;
    }
    

    serialize相关方法

    void serializeNode(OutputArchive oa, StringBuilder path) throws IOException {
        String pathString = path.toString();
        DataNode node = getNode(pathString); // 查找节点
        String[] children;
        DataNode nodeCopy;
        synchronized (node) {
            StatPersisted statCopy = new StatPersisted();
            copyStatPersisted(node.stat, statCopy);
            // we do not need to make a copy of node.data because the contents are never changed
            nodeCopy = new DataNode(node.data, node.acl, statCopy);
            children = node.getChildren().toArray(new String[0]);
        }
        serializeNodeData(oa, pathString, nodeCopy); // 把节点写入到oa中
        path.append('/');
        int off = path.length();
        // 遍历子节点,将子节点写入oa中
        for (String child : children) {
            path.delete(off, Integer.MAX_VALUE);
            path.append(child);
            serializeNode(oa, path);
        }
    }
    
    // visible for test
    public void serializeNodeData(OutputArchive oa, String path, DataNode node) throws IOException {
        oa.writeString(path, "path");
        oa.writeRecord(node, "node");
    }
    
    public void serializeAcls(OutputArchive oa) throws IOException {
        aclCache.serialize(oa);
    }
    
    // 序列化整个NodeHashMap对象
    public void serializeNodes(OutputArchive oa) throws IOException {
        serializeNode(oa, new StringBuilder());
        // / marks end of stream
        // we need to check if clear had been called in between the snapshot.
        if (root != null) {
            oa.writeString("/", "path");
        }
    }
    
    // 完整序列化
    public void serialize(OutputArchive oa, String tag) throws IOException {
        serializeAcls(oa);
        serializeNodes(oa);
    }
    

    deserialize相关方法

    public void deserialize(InputArchive ia, String tag) throws IOException {
        aclCache.deserialize(ia);
        nodes.clear();
        pTrie.clear();
        nodeDataSize.set(0);
        String path = ia.readString("path");
        while (!"/".equals(path)) {
            DataNode node = new DataNode();
            ia.readRecord(node, "node");
            nodes.put(path, node);
            synchronized (node) {
                aclCache.addUsage(node.acl);
            }
            int lastSlash = path.lastIndexOf('/');
            if (lastSlash == -1) {
                root = node;
            } else {
                String parentPath = path.substring(0, lastSlash);
                DataNode parent = nodes.get(parentPath);
                if (parent == null) {
                    throw new IOException(
                            "Invalid Datatree, unable to find parent " + parentPath + " of path " + path);
                }
                parent.addChild(path.substring(lastSlash + 1));
                long owner = node.stat.getEphemeralOwner();
                EphemeralType ephemeralType = EphemeralType.get(owner);
                if (ephemeralType == EphemeralType.CONTAINER) {
                    containers.add(path);
                } else if (ephemeralType == EphemeralType.TTL) {
                    ttls.add(path);
                } else if (owner != 0) {
                    HashSet list = ephemerals.computeIfAbsent(owner, k -> new HashSet<>());
                    list.add(path);
                }
            }
            path = ia.readString("path");
        }
        // have counted digest for root node with "", ignore here to avoid counting twice for root node
        nodes.putWithoutDigest("/", root);
    
        nodeDataSize.set(approximateDataSize());
    
        // we are done with deserializing the datatree update the quotas - create path trie
        // and also update the stat nodes
        setupQuota();
    
        aclCache.purgeUnused();
    }
    

    FileTxnSnapLog

    操作TxnLog和SnapShot的入口类。

    构造方法会创建dataDir和snapDir目录,判断数据目录可写,创建txnLog和snapLog对象访问数据文件。

    主要方法

    // 从snapshots和transaction logs加载数据库
    public long restore(DataTree dt, Map sessions, PlayBackListener listener);
    
    // fast forward the server database to have the latest transactions in it
    // This is the same as restore, but only reads from the transaction logs and not restores from a snapshot
    public long fastForwardFromEdits(DataTree dt, Map sessions, PlayBackListener listener);
    
    // 使用txnLog.read(zxid, fastForward)方法从指定zxid加载TxnIterator
    public TxnIterator readTxnLog(long zxid, boolean fastForward);
    
    // process the transaction on the datatree
    public void processTransaction(TxnHeader hdr, DataTree dt, Map sessions, Record txn);
    
    // 使用txnLog.getLastLoggedZxid()方法获取last logged zxid
    public long getLastLoggedZxid();
    
    // 把datatree和sessions保存到snapshot中
    public File save(
        DataTree dataTree, ConcurrentHashMap sessionsWithTimeouts, boolean syncSnap);
    
    // 把txnLog truncate到指定的zxid
    public boolean truncateLog(long zxid);
    
    // 使用snaplog.findMostRecentSnapshot()方法加载最近snapshot文件
    public File findMostRecentSnapshot();
    // 使用snaplog.findNRecentSnapshots(n)方法加载n个最近snapshot文件
    public List findNRecentSnapshots(int n);
    // 使用snaplog.findNValidSnapshots(n)方法加载n个合法snapshot文件
    public List findNValidSnapshots(int n);
    
    // 获取快照文件,可能包含比给定zxid更新的事务。
    // 包括起始zxid大于给定zxid的日志,以及起始zxid小于给定zxid的最新事务日志。
    // 后一个日志文件可能包含超出给定zxid的事务。
    public File[] getSnapshotLogs(long zxid);
    
    // 使用txnLog.append(si)追加数据
    public boolean append(Request si);
    // txnLog.commit()提交数据
    public void commit();
    

    restore方法

    1. 从snapshot加载dataTree数据
    2. 从txnlog加载dataTree和committedlog数据
    3. 如果没有加载到dataTree数据,将空的dataTree数据保存到snapshot.0文件中

    fastForwardFromEdits方法

    从txnlog加载dataTree和committedlog数据。

    processTransaction方法

    public void processTransaction(TxnHeader hdr, DataTree dt, Map sessions,
            Record txn) throws KeeperException.NoNodeException {
        ProcessTxnResult rc;
        switch (hdr.getType()) {
        case OpCode.createSession:
            sessions.put(hdr.getClientId(), ((CreateSessionTxn) txn).getTimeOut());
            // give dataTree a chance to sync its lastProcessedZxid
            rc = dt.processTxn(hdr, txn);
            break;
        case OpCode.closeSession:
            sessions.remove(hdr.getClientId());
            rc = dt.processTxn(hdr, txn);
            break;
        default:
            rc = dt.processTxn(hdr, txn);
        }
    }
    

    save方法

    public File save(DataTree dataTree, ConcurrentHashMap sessionsWithTimeouts,
            boolean syncSnap) throws IOException {
        long lastZxid = dataTree.lastProcessedZxid;
        // 文件名snapshot.${lastZxid}
        File snapshotFile = new File(snapDir, Util.makeSnapshotName(lastZxid));
        try {
            snapLog.serialize(dataTree, sessionsWithTimeouts, snapshotFile, syncSnap);
            return snapshotFile;
        } catch (IOException e) {
            throw e;
        }
    }
    

    truncateLog方法

    public boolean truncateLog(long zxid) {
        try {
            // close the existing txnLog and snapLog
            close();
    
            // truncate it
            try (FileTxnLog truncLog = new FileTxnLog(dataDir)) {
                boolean truncated = truncLog.truncate(zxid);
    
                // re-open the txnLog and snapLog
                // I'd rather just close/reopen this object itself, however that
                // would have a big impact outside ZKDatabase as there are other
                // objects holding a reference to this object.
                txnLog = new FileTxnLog(dataDir);
                snapLog = new FileSnap(snapDir);
    
                return truncated;
            }
        } catch (IOException e) {
            return false;
        }
    }
    

    TxnLog接口和FileTxnLog实现类

    txnlog

    使用文件保存所有的事务操作,客户端的写操作会先写入txnlog文件,在follower达到quorum状态后提交到dataTree中,在ZKDatabase启动阶段,如果txnlog的zxid大于snapshot的zxid时,会加载txnlog文件数据回放事务,提交到dataTree中。

    TxnLog接口

    Interface for reading transaction logs.

    public interface TxnLog extends Closeable {
    
        // Setter for ServerStats to monitor fsync threshold exceed
        void setServerStats(ServerStats serverStats);
    
        // roll the current log being appended to
        void rollLog() throws IOException;
    
        // Append a request to the transaction log with a digset
        boolean append(Request request) throws IOException;
    
        // Start reading the transaction logs from a given zxid
        TxnIterator read(long zxid) throws IOException;
    
        // the last zxid of the logged transactions
        long getLastLoggedZxid() throws IOException;
    
        // truncate the log to get in sync with the leader
        boolean truncate(long zxid) throws IOException;
    
        // the dbid for this transaction log
        long getDbId() throws IOException;
    
        // commit the transaction and make sure they are persisted
        void commit() throws IOException;
    
        // return transaction log's elapsed sync time in milliseconds
        long getTxnLogSyncElapsedTime();
    
        void close() throws IOException;
        void setTotalLogSize(long size);
        long getTotalLogSize();
    }
    

    FileTxnLog实现类

    This class implements the TxnLog interface. It provides api's to access the txnlogs and add entries to it.
    The format of a Transactional log is as follows:
    
       LogFile:
           FileHeader TxnList ZeroPad
    
       FileHeader: {
           magic 4bytes (ZKLG)
           version 4bytes
           dbid 8bytes
       }
    
       TxnList:
           Txn || Txn TxnList
    
       Txn:
           checksum Txnlen TxnHeader Record 0x42
    
       checksum: 8bytes Adler32 is currently used
         calculated across payload -- Txnlen, TxnHeader, Record and 0x42
    
       Txnlen:
           len 4bytes
    
       TxnHeader: {
           sessionid 8bytes
           cxid 4bytes
           zxid 8bytes
           time 8bytes
           type 4bytes
       }
    
       Record:
           See Jute definition file for details on the various record types
    
       ZeroPad:
           0 padded to EOF (filled during preallocation stage)
    

    FileTxnLog主要方法实现

    public synchronized boolean append(Request request) throws IOException {
        TxnHeader hdr = request.getHdr();
        if (hdr == null) { // 不是事务请求
            return false;
        }
        if (hdr.getZxid() <= lastZxidSeen) {
            LOG.warn("...");
        } else {
            lastZxidSeen = hdr.getZxid();
        }
        if (logStream == null) {
            // 创建新log.${hdr.zxid}文件
            logFileWrite = new File(logDir, Util.makeLogName(hdr.getZxid()));
            fos = new FileOutputStream(logFileWrite);
            logStream = new BufferedOutputStream(fos);
            oa = BinaryOutputArchive.getArchive(logStream);
            FileHeader fhdr = new FileHeader(TXNLOG_MAGIC, VERSION, dbId); // 文件头
            long dataSize = oa.getDataSize();
            fhdr.serialize(oa, "fileheader"); // 写文件头
            logStream.flush();
            // 文件偏移量
            filePosition += oa.getDataSize() - dataSize;
            filePadding.setCurrentSize(filePosition);
            streamsToFlush.add(fos);
        }
        fileSize = filePadding.padFile(fos.getChannel(), filePosition);
        byte[] buf = request.getSerializeData();
        long dataSize = oa.getDataSize();
        Checksum crc = makeChecksumAlgorithm();
        crc.update(buf, 0, buf.length);
        oa.writeLong(crc.getValue(), "txnEntryCRC"); // checksum
        Util.writeTxnBytes(oa, buf); // 写len, hdr, txn, digest, 0x42
        unFlushedSize += oa.getDataSize() - dataSize; // 计算未flush字节数
        return true;
    }
    
    public long getLastLoggedZxid() {
        File[] files = getLogFiles(logDir.listFiles(), 0);
        long maxLog = files.length > 0 ?
            Util.getZxidFromName(files[files.length - 1].getName(), LOG_FILE_PREFIX) : -1;
        // 最新的log文件的后缀作为zxid
        long zxid = maxLog;
        // 从文件解析最新zxid
        try (FileTxnLog txn = new FileTxnLog(logDir); TxnIterator itr = txn.read(maxLog)) {
            while (true) {
                if (!itr.next()) {
                    break;
                }
                TxnHeader hdr = itr.getHeader();
                zxid = hdr.getZxid();
            }
        } catch (IOException e) {
        }
        return zxid;
    }
    
    public synchronized void rollLog() throws IOException {
        if (logStream != null) {
            this.logStream.flush(); // 把当前文件刷写出去
            prevLogsRunningTotal += getCurrentLogSize();
            this.logStream = null; // 重置相关变量,后续append时会创建新的文件
            oa = null;
            fileSize = 0;
            filePosition = 0;
            unFlushedSize = 0;
        }
    }
    
    public synchronized void commit() throws IOException {
        if (logStream != null) {
            logStream.flush(); // 刷写文件
            filePosition += unFlushedSize;
            // If we have written more than we have previously preallocated,
            // we should override the fileSize by filePosition.
            if (filePosition > fileSize) {
                fileSize = filePosition;
            }
            unFlushedSize = 0;
        }
        for (FileOutputStream log : streamsToFlush) {
            log.flush(); // 刷写文件
            if (forceSync) {
                long startSyncNS = System.nanoTime();
    
                FileChannel channel = log.getChannel();
                channel.force(false);
    
                // 略
            }
        }
        // 关闭文件流
        while (streamsToFlush.size() > 1) {
            streamsToFlush.poll().close();
        }
    
        // Roll the log file if we exceed the size limit
        if (txnLogSizeLimit > 0) { // 默认-1分支进不来
            long logSize = getCurrentLogSize();
            if (logSize > txnLogSizeLimit) {
                rollLog();
            }
        }
    }
    
    // FileTxnIterator封装logFile和输入流对象,可以按照协议从文件流读取txnLog数据
    public TxnIterator read(long zxid) throws IOException {
        return read(zxid, true);
    }
    public TxnIterator read(long zxid, boolean fastForward) throws IOException {
        return new FileTxnIterator(logDir, zxid, fastForward);
    }
    
    // 将log文件truncate到指定zxid位置
    public boolean truncate(long zxid) throws IOException {
        try (FileTxnIterator itr = new FileTxnIterator(this.logDir, zxid)) {
            PositionInputStream input = itr.inputStream;
            if (input == null) {
                throw new IOException("No log files found to truncate");
            }
            long pos = input.getPosition();
            // now, truncate at the current position
            RandomAccessFile raf = new RandomAccessFile(itr.logFile, "rw");
            raf.setLength(pos);
            raf.close(); // 把最小的文件truncate到指定zxid位置
            while (itr.goToNextLog()) { // 删除所有>zxid的log文件
                if (!itr.logFile.delete()) {
                }
            }
        }
        return true;
    }
    
    private static FileHeader readHeader(File file) throws IOException {
        InputStream is = null;
        try {
            is = new BufferedInputStream(new FileInputStream(file));
            InputArchive ia = BinaryInputArchive.getArchive(is);
            FileHeader hdr = new FileHeader();
            hdr.deserialize(ia, "fileheader"); // 反序列化
            return hdr;
        } finally {
            // is.close();
        }
    }
    

    FileTxnIterator类

    this class implements the txnlog iterator interface which is used for reading the transaction logs.

    内部使用List保存着比指定zxid大或者含有指定zxid数据的log文件,初始化阶段会定位到参数zxid指定的位置,这样在后续访问时就可以从参数指定的zxid开始读取数据了。

    public FileTxnIterator(File logDir, long zxid, boolean fastForward) throws IOException {
        this.logDir = logDir;
        this.zxid = zxid;
        init();
    
        if (fastForward && hdr != null) {
            while (hdr.getZxid() < zxid) { // 这里将数据移动到zxid位置
                if (!next()) {
                    break;
                }
            }
        }
    }
    
    void init() throws IOException {
        storedFiles = new ArrayList<>();
        // 倒序查找log文件
        List files = Util.sortDataDir(
            FileTxnLog.getLogFiles(logDir.listFiles(), 0),
            LOG_FILE_PREFIX,
            false);
        for (File f : files) {
            if (Util.getZxidFromName(f.getName(), LOG_FILE_PREFIX) >= zxid) {
                storedFiles.add(f);
            } else if (Util.getZxidFromName(f.getName(), LOG_FILE_PREFIX) < zxid) {
                // add the last logfile that is less than the zxid
                storedFiles.add(f);
                break;
            }
        }
        goToNextLog(); // 定位到下一个文件
        next(); // 定位到下一个log数据
    }
    

    SnapShot接口和FileSnap实现类

    SnapShot接口

    snapshot interface for the persistence layer. implement this interface for implementing snapshots.

    public interface SnapShot {
    
        // deserialize a data tree from the last valid snapshot and return the last zxid that was deserialized
        long deserialize(DataTree dt, Map sessions) throws IOException;
    
        // persist the datatree and the sessions into a persistence storage
        void serialize(DataTree dt, Map sessions, File name, boolean fsync) throws IOException;
    
        // find the most recent snapshot file
        File findMostRecentSnapshot() throws IOException;
    
        // get information of the last saved/restored snapshot
        SnapshotInfo getLastSnapshotInfo();
    
        // free resources from this snapshot immediately
        void close() throws IOException;
    }
    

    FileSnap实现类

    负责存储、序列化和反序列化正确的快照。并提供对快照的访问:

    public long deserialize(DataTree dt, Map sessions) throws IOException {
        // 在snapDir下查找合法的快照文件,倒序,所以最新的在前面
        List snapList = findNValidSnapshots(100);
        File snap = null;
        long snapZxid = -1;
        boolean foundValid = false;
        for (int i = 0, snapListSize = snapList.size(); i < snapListSize; i++) {
            snap = snapList.get(i);
            snapZxid = Util.getZxidFromName(snap.getName(), SNAPSHOT_FILE_PREFIX);
            try (CheckedInputStream snapIS = SnapStream.getInputStream(snap)) {
                InputArchive ia = BinaryInputArchive.getArchive(snapIS);
                deserialize(dt, sessions, ia); // 将数据反序列到dt
                SnapStream.checkSealIntegrity(snapIS, ia);
    
                // Deserializing the zxid digest from the input
                // stream and update the digestFromLoadedSnapshot.
                // 格式: zxid digestVersion digest
                if (dt.deserializeZxidDigest(ia, snapZxid)) {
                    SnapStream.checkSealIntegrity(snapIS, ia);
                }
    
                // deserialize lastProcessedZxid and check inconsistency
                // 读lastZxid字段得到
                if (dt.deserializeLastProcessedZxid(ia)) {
                    SnapStream.checkSealIntegrity(snapIS, ia);
                }
    
                foundValid = true;
                break;
            } catch (IOException e) {}
        }
        // 验证foundValid
        // 上次处理到的zxid
        dt.lastProcessedZxid = snapZxid;
        lastSnapshotInfo = new SnapshotInfo(dt.lastProcessedZxid, snap.lastModified() / 1000);
    
        // compare the digest if this is not a fuzzy snapshot, we want to compare and find inconsistent asap
        if (dt.getDigestFromLoadedSnapshot() != null) {
            dt.compareSnapshotDigests(dt.lastProcessedZxid);
        }
        return dt.lastProcessedZxid;
    }
    
    public static void deserialize(
            DataTree dt, Map sessions, InputArchive ia) throws IOException {
        FileHeader header = new FileHeader(); // magic, version, dbid
        header.deserialize(ia, "fileheader"); // 解析文件头并验证magic
        if (header.getMagic() != SNAP_MAGIC) {
            throw new IOException("mismatching magic headers");
        }
        // 反序列化
        // 会话:
        //   Count Session(s)
        //   Session {id, timeout}
        // 节点:
        //   AclCache PathNode(s)
        //   PathNode {path, node}
        //   node {data, acl, stat}
        SerializeUtils.deserializeSnapshot(dt, ia, sessions);
    }
    
    protected List findNValidSnapshots(int n) {
        // 在snapDir下查找快照文件,倒序,最新的在前面
        List files = Util.sortDataDir(snapDir.listFiles(), SNAPSHOT_FILE_PREFIX, false);
        int count = 0;
        List list = new ArrayList<>();
        for (File f : files) {
            try {
                if (SnapStream.isValidSnapshot(f)) { // 验证文件合法
                    list.add(f);
                    count++;
                    if (count == n) {
                        break;
                    }
                }
            } catch (IOException e) {}
        }
        return list;
    }
    
    public List findNRecentSnapshots(int n) throws IOException {
        // 在snapDir下查找快照文件,倒序,最新的在前面
        List files = Util.sortDataDir(snapDir.listFiles(), SNAPSHOT_FILE_PREFIX, false);
        int count = 0;
        List list = new ArrayList<>();
        for (File f : files) {
            if (count == n) {
                break;
            }
            if (Util.getZxidFromName(f.getName(), SNAPSHOT_FILE_PREFIX) != -1) {
                count++;
                list.add(f);
            }
        }
        return list;
    }
    
    protected void serialize(
            DataTree dt, Map sessions, OutputArchive oa, FileHeader header) throws IOException {
        // 验证header!=null
        header.serialize(oa, "fileheader"); // 序列化文件头
        SerializeUtils.serializeSnapshot(dt, oa, sessions); // 序列化dataTree
    }
    
    public synchronized void serialize(
            DataTree dt, Map sessions, File snapShot, boolean fsync) throws IOException {
        if (!close) {
            try (CheckedOutputStream snapOS = SnapStream.getOutputStream(snapShot, fsync)) {
                OutputArchive oa = BinaryOutputArchive.getArchive(snapOS);
                FileHeader header = new FileHeader(SNAP_MAGIC, VERSION, dbId);
                serialize(dt, sessions, oa, header);
                SnapStream.sealStream(snapOS, oa);
    
                // 序列化digest
                if (dt.serializeZxidDigest(oa)) {
                    SnapStream.sealStream(snapOS, oa);
                }
    
                // 序列化lastProcessZxid
                if (dt.serializeLastProcessedZxid(oa)) {
                    SnapStream.sealStream(snapOS, oa);
                }
    
                lastSnapshotInfo = new SnapshotInfo(
                    Util.getZxidFromName(snapShot.getName(), SNAPSHOT_FILE_PREFIX),
                    snapShot.lastModified() / 1000);
            }
        } else {
            throw new IOException("FileSnap has already been closed");
        }
    }
    

    DatadirCleanupManager

    启动周期任务

    清理过期文件,保留最新的snapRetainCount个snapshot文件和对应的txnlog文件,将其余过期的文件删除掉。

    purgeInterval参数指定执行周期(小时),默认0不开启清理功能。

    public void start() {
        if (PurgeTaskStatus.STARTED == purgeTaskStatus) {
            return;
        }
        // Don't schedule the purge task with zero or negative purge interval.
        if (purgeInterval <= 0) {
            return;
        }
    
        timer = new Timer("PurgeTask", true);
        TimerTask task = new PurgeTask(dataLogDir, snapDir, snapRetainCount);
        timer.scheduleAtFixedRate(task, 0, TimeUnit.HOURS.toMillis(purgeInterval));
    
        purgeTaskStatus = PurgeTaskStatus.STARTED;
    }
    

    PurgeTask

    public void run() {
        try {
            PurgeTxnLog.purge(logsDir, snapsDir, snapRetainCount);
        } catch (Exception e) {}
    }
    

    PurgeTxnLog.purge方法:

    public static void purge(File dataDir, File snapDir, int num) throws IOException {
        if (num < 3) {
            throw new IllegalArgumentException(COUNT_ERR_MSG);
        }
    
        FileTxnSnapLog txnLog = new FileTxnSnapLog(dataDir, snapDir);
    
        // 倒序查找最新的num个snapshot文件
        List snaps = txnLog.findNValidSnapshots(num);
        int numSnaps = snaps.size();
        if (numSnaps > 0) {
            // 删除掉zxid比snaps小的txnlog和snapshot文件
            purgeOlderSnapshots(txnLog, snaps.get(numSnaps - 1));
        }
    }
    

    ContainerManager

    负责清理container节点,只能有leader管理。启动后,定期检查cversion>0且没有子级的container节点和ttl节点。尝试删除节点,删除的结果并不重要。如果提议失败或容器节点不为空,则没有任何危害。

  • 相关阅读:
    Android12 适配 usb 触屏
    【LeetCode75】第七十一题 搜索推荐系统
    【Python百日进阶-Web开发-Peewee】Day278 - SQLite 扩展(三)
    go: no such tool “compile“(一次糟糕体验)
    不开窗求每个人当日累计的答题数
    全国双非院校考研信息汇总整理 Part.6
    iOS端如何实现MobLink的场景还原功能
    XSS 从 PDF 中窃取数据
    curlpp(curl C++) http错误码和重定向后的url获取
    STM32 IAP相关的FLASH数据读取和跳转
  • 原文地址:https://www.cnblogs.com/xugf/p/17953507