• zookeeper源码(09)follower处理客户端请求


    在zookeeper中,follower也可以接收客户端连接,处理客户端请求,本文将分析follower处理客户端请求的流程:

    • 读请求处理
    • 写请求转发与响应

    follower接收转发客户端请求

    网络层接收客户端数据包

    leader、follower都会启动ServerCnxnFactory组件,用来接收客户端连接、读取客户端数据包、将客户端数据包转发给zk应用层。

    在"zookeeper源码(08)请求处理及数据读写流程"一文中已经介绍,ServerCnxn在读取到客户端数据包之后,会调用zookeeperServer的processConnectRequest或processPacket方法:

    • processConnectRequest方法:创建session
    • processPacket方法:处理业务请求

    processConnectRequest创建session

    • 会使用sessionTracker生成sessionId、创建session对象
    • 生成一个密码
    • 提交一个createSession类型Request并提交给业务处理器
    long createSession(ServerCnxn cnxn, byte[] passwd, int timeout) {
        // 生成sessionId、创建session对象
        long sessionId = sessionTracker.createSession(timeout);
        // 生成密码
        Random r = new Random(sessionId ^ superSecret);
        r.nextBytes(passwd);
        // 提交createSession类型Request
        CreateSessionTxn txn = new CreateSessionTxn(timeout);
        cnxn.setSessionId(sessionId);
        Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, RequestRecord.fromRecord(txn), null);
        submitRequest(si);
        return sessionId;
    }
    

    processPacket处理业务请求

    • 封装Request
    • 验证largeRequest
    • 提交业务层处理器
    Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), request, cnxn.getAuthInfo());
    int length = request.limit();
    if (isLargeRequest(length)) {
        // checkRequestSize will throw IOException if request is rejected
        checkRequestSizeWhenMessageReceived(length);
        si.setLargeRequestSize(length);
    }
    si.setOwner(ServerCnxn.me);
    submitRequest(si);
    

    FollowerRequestProcessor处理器

    在follower端,客户端请求会由FollowerRequestProcessor处理:

    1. 把请求提交下游CommitProcessor处理器
    2. 写请求转发给leader处理
    3. 读请求经过CommitProcessor直接转发给FinalRequestProcessor处理器,直接查询数据返回给客户端
    public void run() {
        try {
            while (!finished) {
    
                Request request = queuedRequests.take();
    
                // Screen quorum requests against ACLs first 略
    
                // 转发给CommitProcessor处理器
                // 提交到queuedRequests队列
                // 写请求还会提交到queuedWriteRequests队列
                maybeSendRequestToNextProcessor(request);
    
                // ...
    
                // 写请求需要转发给leader处理
                switch (request.type) {
                case OpCode.sync:
                    zks.pendingSyncs.add(request); // 待同步命令
                    zks.getFollower().request(request);
                    break;
                case OpCode.create:
                case OpCode.create2:
                case OpCode.createTTL:
                case OpCode.createContainer:
                case OpCode.delete:
                case OpCode.deleteContainer:
                case OpCode.setData:
                case OpCode.reconfig:
                case OpCode.setACL:
                case OpCode.multi:
                case OpCode.check:
                    zks.getFollower().request(request);
                    break;
                case OpCode.createSession:
                case OpCode.closeSession:
                    if (!request.isLocalSession()) {
                        zks.getFollower().request(request);
                    }
                    break;
                }
            }
        } catch (Exception e) {
            handleException(this.getName(), e);
        }
    }
    

    转发leader

    zks.getFollower().request(request);
    

    Learner转发请求:

    void request(Request request) throws IOException {
        // 略
    
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        DataOutputStream oa = new DataOutputStream(baos);
        oa.writeLong(request.sessionId); // sessionId
        oa.writeInt(request.cxid); // 客户端xid
        oa.writeInt(request.type); // 业务类型
        byte[] payload = request.readRequestBytes(); // 请求体
        if (payload != null) {
            oa.write(payload);
        }
        oa.close();
        // 封装REQUEST数据包
        QuorumPacket qp = new QuorumPacket(Leader.REQUEST, -1, baos.toByteArray(), request.authInfo);
        writePacket(qp, true); // 通过网络发给leader服务器
    }
    

    leader处理follower请求

    LearnerHandler接收REQUEST请求

    case Leader.REQUEST:
        bb = ByteBuffer.wrap(qp.getData());
        sessionId = bb.getLong(); // 解析请求信息
        cxid = bb.getInt();
        type = bb.getInt();
        bb = bb.slice();
        Request si;
        if (type == OpCode.sync) {
            si = new LearnerSyncRequest(
                this, sessionId, cxid, type, RequestRecord.fromBytes(bb), qp.getAuthinfo());
        } else {
            si = new Request(null, sessionId, cxid, type, RequestRecord.fromBytes(bb), qp.getAuthinfo());
        }
        si.setOwner(this); // 用来判断请求来自follower
        learnerMaster.submitLearnerRequest(si); // 提交给业务处理器
        requestsReceived.incrementAndGet();
    

    submitLearnerRequest提交业务处理器:

    public void submitLearnerRequest(Request si) {
        zk.submitLearnerRequest(si);
    }
    

    LeaderZooKeeperServer提交业务处理器:

    public void submitLearnerRequest(Request request) {
        // 提交给PrepRequestProcessor处理器
        prepRequestProcessor.processRequest(request);
    }
    

    从此处开始走leader处理写请求流程。

    leader处理写请求流程回顾

    • PrepRequestProcessor - 做事务设置
    • ProposalRequestProcessor - 发起proposal,将Request转发给SyncRequestProcessor写事务log、本地ack
    • CommitProcessor - 读请求直接调用下游处理器,写请求需要等待足够的ack之后commit再调用下游RequestProcessor处理器
    • ToBeAppliedRequestProcessor - 维护toBeApplied列表
    • FinalRequestProcessor - 把事务应用到ZKDatabase,提供查询功能,返回响应

    follower处理leader数据

    在follower中,Follower使用processPacket方法处理来自leader的数据包,此处看一下PROPOSAL和COMMIT的逻辑。

    PROPOSAL数据包

    fzk.logRequest(hdr, txn, digest);
    

    logRequest会使用syncProcessor将事务写入到txnlog文件,之后调用SendAckRequestProcessor处理器给leader发ack数据包。

    leader收到超过半数的ack之后会发COMMIT数据包让各个节点将事务应用到ZKDatabase中。

    COMMIT数据包

    fzk.commit(qp.getZxid());
    

    CommitProcessor处理器会将其提交到committedRequests队列,之后客户端Request会继续向下游FinalRequestProcessor处理器传递。

    FinalRequestProcessor处理器

    • 把事务应用到ZKDatabase中
    • 提供查询功能
    • 给客户端返回响应
  • 相关阅读:
    python基础(五)----时间模块
    神经网络各层的作用
    MobileViT——论文简述
    【如何学习CAN总线测试】——AUTOSAR网络管理测试
    阿里云负载均衡SLB,HTTPS动态网站部署负载均衡,实现高并发流量分发
    (微服务多模块)Springboot+Security+Redis+JWT 仅需一招
    智能abc是什么输入法:win10可用的智能abc输入法免费下载
    基于Springboot的宠物医院管理系统-JAVA【数据库设计、论文、源码、开题报告】
    java 企业工程管理系统软件源码+Spring Cloud + Spring Boot +二次开发+ MybatisPlus + Redis
    CV复习:上/下采样
  • 原文地址:https://www.cnblogs.com/xugf/p/18033571