• 【ZooKeeper】zookeeper源码5-ZKDatabase冷启动恢复


    源码项目zookeeper-3.6.3:核心工作流程

    入口类:QuorumPeerMain
    main方法中,首先构建一个实例,由main.initializedAndRun(args)方法进入。
    initializedAndRun表明初始化和启动两件事,实际做了三件事:
    第一:解析配置文件config.parse(args[0]),解析zoo.cfg,解析myid
    第二:生成定时任务,目标清理旧快照文件DatadirCleanupManager
    第三:runFromConfig(config)从配置中启动QuorumPeer
    runFromConfig中分三个步骤:
    1、创建NIO服务端ServerCnxnFactory.createFactory();
    2、getQuorumPeer()方法创建一个QuorumPeer实例对象,将QuorumPeerConfig中的各种参数赋值到quorumPeer对象中
    3、执行quorumPeer.start()方法
    start方法做了六件事:
    1)loadDataBase()冷启动恢复
    2)startServerCnxnFactory()启动NIO服务端
    3)admin.server()
    4)startLeaderElection() 选举
    5)startJVMPauseMonitor() JVM监听器
    6)super.start() QuorumPeer实际是个线程,super.start()跳转到QuorumPeer的run方法

    顶级类

    class ZKDatabase{
    	DataTree{
    		DataNode root;
    		List<DataNode> dns;
    	}
    	FileSnapTxnLog{
    		SnapShot //负责拍摄快照 
    		TxnLog	//负责记录日志 增删改查
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 入口方法:QuorumPeer.loadDataBase();
    • 内部:
      • zkDb.loadDataBase();//zkDb就是ZKDatabase,每个QuorumPeer中都存储了所有的数据,loadDataBase做完恢复后,会得到最大的事务ID(即zkDb.getDataTree().lastProcessedZxid,为什么获取它,因为它是选举过程中的凭证,三个凭证zxid是其中之一)
        • zkDb.loadDataBase()内initialized=true;为ZKDatabase状态标识;
        • zkDb.loadDataBase()内snapLog由SnapShot,TxnLog组成,提供两个方法:restore做恢复,save做快照,DataTree空容器;
          • snapLog.restore内snapLog.deserialize(dt,sessions);进行反序列化
            • snapLog.deserialize内分两部分:一部分加载快照,一部分加载日志
          • restore从快照中恢复绝大部分数据
          • restore从日志中恢复一部分最新数据

    怎么读快照
    快照文件存储了,大量DataNode对象构成一棵树,snap.deserialize做不停迭代去读取DataNode
    FileSnap.deserialize中findNValidSnapshots(100)方法,获取部分最新的合法快照,始终保留的快照文件有3个(保留3个,以防最新的损坏了),返回snapList快照文件列表;
    遍历快照文件snapList,拿到snap快照文件,读取快照文件snap,通过输入流SnapStream包装成CheckedInputStream,然后包装成InputArchive,通过deserialize(dt,sessions,ia)反序列化快照文件(总结:1、构建输入流 2、包装成InputArchive)
    先读取到快照文件的头信息fileheader,如果header.getMagic()!=SNAP_MAGIC(即不等于ZKSN)说明最新快照损坏,可以读取第2个,第3个快照。
    在SerializeUtils.deserializeSnapshot中读取session个数count,恢复session,恢复DataTree,恢复DataTree中path和node是一对,不停的读取path和node
    deserializeResult是快照最大serialize id,为起点

    加载日志文件,highestZxid是从日志中最大zxid,为加载终点。进入fastForwardEdits方法中DataTree已经加载过快照文件,从dt.lastProcessedZxid+1处开始读取日志文件。每条日志数据,代表了一个事务,每个事务有一个zxid,放在header中。先读取header,从header取到zxid。header为空表明日志内没有数据,以快照中最大zxid作为日志最大zxid。header不为空,从header中取zxid和highestZxid做比较,如果比highestZxid大,将header中zxid赋值给highestZxid,因为没读取一个日志替换一次。processTransaction中处理事务头(即header),事务体(即日志文件),进行日志恢复。根据事务头中的类型执行相关动作,创建会话、关闭会话,如果不是会话级别的事务默认走DataTree DataNode节点型事务dt.processTxn(hdr,txn),即processTransaction方法将会话级别的动作处理了。进入processTxn方法,依然根据header.getType()事务类型判断,如果事务时create,执行createNode,以及createTTL、createContainer、delete、deleteContainer、reconfig、setData、setACL、closeSession、error、check、mult等,均是在DataTree中完成的。

    DataTree既是一个java类,也是一个DataNode的容器,也是各类增删改查的方法。

    Session中一个SessionID,一个timeout,当client连接zk1,在session经历一段时间后,zk1挂掉了,然后client轮询到zk2,可以恢复连接zk1时的session继续使用。作用:防止网络波动产生误删的操作。

    /**
         * load the database from the disk onto memory and also add
         * the transactions to the committedlog in memory.
         * // TODO 注释: 这个方法就是 ZK 集群做冷启动数据恢复的入口
         * // TODO 注释: DataTree FileTxnSnapLog
         *
         * @return the last valid zxid on disk
         * @throws IOException
         */
        public long loadDataBase() throws IOException {
            long startTime = Time.currentElapsedTime();
    
            /*************************************************
             *  注释: 重新恢复
             *  1、参数: dataTree 容器
             *  2、zxid 返回值, 代表的是 当前这个 datatree 中的最大的 事务id
             *  snapLog = FileTxnSnapLog
             *  zxid = 100
             *  1、内部重要的两个组成:SnapShot, TxnLog
             *  2、提供了两个方法:restore 做恢复, save 做快照
             *  dataTree 就是一个空容器
             */
            long zxid = snapLog.restore(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener);
    
            // TODO 注释: 表示已经初始化
            initialized = true;
    
            // TODO 注释: 计算数据加载恢复时间
            long loadTime = Time.currentElapsedTime() - startTime;
    
            ServerMetrics.getMetrics().DB_INIT_TIME.add(loadTime);
            LOG.info("Snapshot loaded in {} ms, highest zxid is 0x{}, digest is {}", loadTime, Long.toHexString(zxid),
                    dataTree.getTreeDigest());
            return zxid;
        }
    
        /**
         * Fast forward the database adding transactions from the committed log into memory.
         *
         * @return the last valid zxid.
         * @throws IOException
         */
        public long fastForwardDataBase() throws IOException {
            long zxid = snapLog.fastForwardFromEdits(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener);
            initialized = true;
            return zxid;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    /**
         * this function restores the server
         * database after reading from the
         * snapshots and transaction logs
         *
         * // TODO_MA 注释: 假设当前 zookeeper 最大 事务 id  = 100
         *
         * @param dt       the datatree to be restored
         * @param sessions the sessions to be restored
         * @param listener the playback listener to run on the
         *                 database restoration
         * @return the highest zxid restored
         * @throws IOException
         * // TODO 注释: 核心步骤
         * // TODO 注释: 1、从快照中恢复 绝大部分数据:1-95
         * // TODO 注释: 2、从日志中,恢复一部分最新数据:96-100
         */
        public long restore(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener) throws IOException {
            long snapLoadingStartTime = Time.currentElapsedTime();
    
            /*************************************************
             *  注释: 从快照恢复绝大部分数据
             *  快照文件恢复: 1-95
             *  里面又包含恢复 session 信息 和 datatree 数据
             *  dt.lastProcessID = 95
             *  snapLog = SnapShot 快照功能
             */
            long deserializeResult = snapLog.deserialize(dt, sessions);
            // TODO 注释: 快照文件存储了:一大堆 DataNode 对象
    
            ServerMetrics.getMetrics().STARTUP_SNAP_LOAD_TIME.add(Time.currentElapsedTime() - snapLoadingStartTime);
            FileTxnLog txnLog = new FileTxnLog(dataDir);
            boolean trustEmptyDB;
            File initFile = new File(dataDir.getParent(), "initialize");
            if(Files.deleteIfExists(initFile.toPath())) {
                LOG.info("Initialize file found, an empty database will not block voting participation");
                trustEmptyDB = true;
            } else {
                trustEmptyDB = autoCreateDB;
            }
    
            /*************************************************
             *  注释: 从日志恢复一小部分最新数据
             *  96-100 这五条事务的数据,从日志中执行恢复
             *  扫描日志文件
             *  highestZxid = 100
             *  1、快照文件的最大 zxid = 95
             *  2、加载日志文件的时候,需要一个起点:95, 加载结束之后,得到一个终点最终的最大 zxid = highestZxid = 100
             */
            RestoreFinalizer finalizer = () -> {
                // TODO 注释: highestZxid 就是该方法(restore)的返回值
                long highestZxid = fastForwardFromEdits(dt, sessions, listener);
                // The snapshotZxidDigest will reset after replaying the txn of the
                // zxid in the snapshotZxidDigest, if it's not reset to null after
                // restoring, it means either there are not enough txns to cover that
                // zxid or that txn is missing
                DataTree.ZxidDigest snapshotZxidDigest = dt.getDigestFromLoadedSnapshot();
                if(snapshotZxidDigest != null) {
                    LOG.warn(
                            "Highest txn zxid 0x{} is not covering the snapshot digest zxid 0x{}, " + "which might lead to inconsistent state",
                            Long.toHexString(highestZxid), Long.toHexString(snapshotZxidDigest.getZxid()));
                }
    
                // TODO 注释: 返回 最大的 zxid
                return highestZxid;
            };
    
            // TODO 注释: 如果还没有快照文件,则执行 ZKDatabase 初始化
            if(-1L == deserializeResult) {
                /* this means that we couldn't find any snapshot, so we need to
                 * initialize an empty database (reported in ZOOKEEPER-2325) */
                if(txnLog.getLastLoggedZxid() != -1) {
                    // ZOOKEEPER-3056: provides an escape hatch for users upgrading
                    // from old versions of zookeeper (3.4.x, pre 3.5.3).
                    if(!trustEmptySnapshot) {
                        throw new IOException(EMPTY_SNAPSHOT_WARNING + "Something is broken!");
                    } else {
                        LOG.warn("{}This should only be allowed during upgrading.", EMPTY_SNAPSHOT_WARNING);
                        return finalizer.run();
                    }
                }
    
                if(trustEmptyDB) {
                    /* TODO: (br33d) we should either put a ConcurrentHashMap on restore() or use Map on save() */
                    save(dt, (ConcurrentHashMap<Long, Integer>) sessions, false);
    
                    /* return a zxid of 0, since we know the database is empty */
                    return 0L;
                } else {
                    /* return a zxid of -1, since we are possibly missing data */
                    LOG.warn("Unexpected empty data tree, setting zxid to -1");
                    dt.lastProcessedZxid = -1L;
                    return -1L;
                }
            }
    
            return finalizer.run();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
        /**
         * deserialize the datatree from an inputarchive
         *
         * @param dt       the datatree to be serialized into
         * @param sessions the sessions to be filled up
         * @param ia       the input archive to restore from
         * @throws IOException
         * // TODO 注释: snapfile1: 1-50
         * // TODO 注释: snapfile2: 1-80
         * // TODO 注释: snapfile3: 1-90
         */
        public void deserialize(DataTree dt, Map<Long, Integer> sessions, InputArchive ia) throws IOException {
    
            // TODO 注释: 先从 fileheader 中读取 SNAP_MAGIC = ZKSN
            FileHeader header = new FileHeader();
            header.deserialize(ia, "fileheader");
    
            // TODO 注释: 如果 魔法值 被破坏了,证明 快照文件失效了。
            if(header.getMagic() != SNAP_MAGIC) {
                throw new IOException("mismatching magic headers " + header.getMagic() + " !=  " + FileSnap.SNAP_MAGIC);
            }
    
            SerializeUtils.deserializeSnapshot(dt, ia, sessions);
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    public static void deserializeSnapshot(DataTree dt, InputArchive ia, Map<Long, Integer> sessions) throws IOException {
    
            /*************************************************
             *  注释: 先恢复 session
             */
            // TODO 注释: 继续读取 count
            int count = ia.readInt("count");
    
            // TODO 注释: 恢复 session
            while (count > 0) {
    
                // TODO 注释: 读取 id
                long id = ia.readLong("id");
    
                // TODO 注释: 读取 timeout
                int to = ia.readInt("timeout");
    
                // TODO 注释: 恢复 session
                sessions.put(id, to);
    
                if (LOG.isTraceEnabled()) {
                    ZooTrace.logTraceMessage(
                        LOG,
                        ZooTrace.SESSION_TRACE_MASK,
                        "loadData --- session in archive: " + id + " with timeout: " + to);
                }
    
                // TODO 注释: 已读取+1,待读取-1
                count--;
            }
    
            /*************************************************
             *  注释: 恢复 DataTree
             */
            dt.deserialize(ia, "tree");
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    /*************************************************
         *  注释: 持久化
         *  把内存中的 DataTree 保存在磁盘文件中形成快照文件
         *  DataTree 由一堆 datanode 节点组成的。其实就是把 这一堆 datnode 实例对象,给保存到磁盘文件
         *  datanode之间的关系,就由对应的 path 路径来决定
         *  -
         *  有一个写请求过来: LSM Tree 存储引擎
         *  1、先记录日志 append()
         *  2、然后写数据到内存 datatree 中
         *  3、提交日志 commit()
         *  -
         *  zookeeper 的所有事务请求,全部都是由 leader 严格有序串行执行
         *  来一条事务,执行一条提交一条  buffer flush xxxxxx
         *  -
         *  方法的核心逻辑:
         *  不停的读取一对数据:path, node
         */
        public void deserialize(InputArchive ia, String tag) throws IOException {
            aclCache.deserialize(ia);
            nodes.clear();
            pTrie.clear();
            nodeDataSize.set(0);
    
            /*************************************************
             *  注释: 从快照文件中,依次恢复 znode 节点到 DataTree 中
             *  方式:
             *  1、先读 path
             *  2、再读 node
             *  datatree 在 snapfile 中的组织形式:
             *       path ==> node
             *       path ==> node
             *       ....
             */
            String path = ia.readString("path");
    
            // TODO_MA 注释: 一直不停的读, 读取所有的节点,恢复到 DataTree
            // TODO_MA 马中华 注释: 维护这棵树,维护上下父子节点的关系
            while (!"/".equals(path)) {
    
                DataNode node = new DataNode();
                ia.readRecord(node, "node");
                nodes.put(path, node);
    
                // TODO 注释: path = /a/b/c
                // TODO 注释: node = DataNode1
                // TODO 注释: 动作1:找到父节点:/a/b
                // TODO 注释: 设置当前节点的父节点是 /a/b
                // TODO 注释: 更新父节点的子节点集合:把当前节点加入到父节点的子节点列表中
    
                synchronized (node) {
                    aclCache.addUsage(node.acl);
                }
                int lastSlash = path.lastIndexOf('/');
                if (lastSlash == -1) {
                    root = node;
                } else {
                    // TODO 注释:
                    String parentPath = path.substring(0, lastSlash);
                    // TODO 注释:
                    DataNode parent = nodes.get(parentPath);
                    if (parent == null) {
                        throw new IOException("Invalid Datatree, unable to find " + "parent " + parentPath + " of path " + path);
                    }
                    // TODO 注释:
                    parent.addChild(path.substring(lastSlash + 1));
                    long eowner = node.stat.getEphemeralOwner();
                    EphemeralType ephemeralType = EphemeralType.get(eowner);
                    if (ephemeralType == EphemeralType.CONTAINER) {
                        containers.add(path);
                    } else if (ephemeralType == EphemeralType.TTL) {
                        ttls.add(path);
                    } else if (eowner != 0) {
                        HashSet<String> list = ephemerals.get(eowner);
                        if (list == null) {
                            list = new HashSet<String>();
                            ephemerals.put(eowner, list);
                        }
                        list.add(path);
                    }
                }
    
                // TODO 注释: 再读一个path,如果 path 为空,证明 znode 节点都恢复完了
                path = ia.readString("path");
            }
    
            // have counted digest for root node with "", ignore here to avoid
            // counting twice for root node
            nodes.putWithoutDigest("/", root);
    
            // TODO 注释: 计算总结点数
            nodeDataSize.set(approximateDataSize());
    
            // we are done with deserializing the the datatree
            // update the quotas - create path trie and also update the stat nodes
            setupQuota();
    
            // TODO 注释: 去重无用的 acl 信息
            aclCache.purgeUnused();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
  • 相关阅读:
    hiredis笔记
    C#单向链表实现非升序插入方法:用LinkedList<int>链表类 vs 自定义单向链表类及非升序插入方法
    【能效管理】电力监控系统在某商业数据中心的应用分析
    UI设计就业前景到底好不好?
    Linux手动更新时间Linux同步集群其他节点时间
    div+css 设备看板样式
    46、TCP的“三次握手”
    OpenHarmony网络协议通信—libevent [GN编译] - 事件通知库
    Python 读写 Excel 文件
    几种嵌入式可编程芯片的简介
  • 原文地址:https://blog.csdn.net/qq_36679460/article/details/127603925