阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

ZooKeeper源码分析:Quorum请求的整个流程

226次阅读
没有评论

共计 26387 个字符,预计需要花费 66 分钟才能阅读完成。

Quorum 请求是转发给 Leader 处理,并且需要得一个 Follower Quorum 确认的请求。这些请求包括:

1)znode 的写操作(OpCode.create,OpCode.delete,OpCode.setData,OpCode.setACL)

2)Session 的创建和关闭操作(OpCode.createSession 和 OpCode.closeSession)

3)OpCode.multi 操作。

本文分析了 Client, Follower 和 Leader 协同完成 Quorum 请求的过程。另外需注意的是 OpCode.sync 请求也需要转发给 Leader, 但不需要得到一个 Follower Quorum 确认。本文也会提到 OpCode.sync 操作。

数据结构

Request 类型对象:Server 内部传递的数据结构。

属性说明
sessionId 会话 ID
cxid客户端事务 ID
type操作类型, 如 OpCode.setData
request请求 Record 对象,如 SetDataRequest
cnxnServer 和 Client 端的连接对象
hdr请求事务头 TxnHeader
txn请求事务体 Record,如 OpCode.setData 请求,则是 SetDataTxn 类型对象
zxidZooKeeper 事务 ID
authInfo认证信息
createTime创建时间
owner所有者
e处理过程中的异常

QuorumPacket 类型对象:用于 ZooKeeper 服务器之间传递的数据包。

属性说明
typeQuorumPacket 类型,如 Leader.REQUEST和 Leader.ACK 等
zxidZooKeeper 事务 ID
data数据包的数据:
在 Leader.REQUEST 中,数据依次如下:
Request.sessionId
Request.cxid
Request.type
Request.request
在 Leader.PROPOSAL 中,数据依次如下:
Request.hdr
Request.txn
在 Leader.ACK 中,为 null
在 Leader.COMMIT 中,为 null
authinfo认证信息

 

Quorum 请求流程

假设拓扑结构如下图,Client A 和 Follower A 建立连接。

ZooKeeper 源码分析:Quorum 请求的整个流程

 

数据流程图如下。在图中,连接线说明前的数字表示事件发的生时序,主时序是直接使用一个数字表示,并且数字越小表示越早发生(如 1 Client Request 是在 2 Request 之前发生)。对于和主时序并发的操作使用主时序序号后加上一个括号括起来的数字表示,如 7(1)-n Request 指和 7 Request 是并发的。7(1)- n 中 n 表示以 7(1)开头的操作时序。

ZooKeeper 源码分析:Quorum 请求的整个流程

我们从数据流程图中 Step 1 讲起:Client A 发起一个 Quorum 请求给 Follower A。

【Client A, Step 1】Client A 调用 Quorum 请求对应的方法:

如调用 Zookeeper 的构造函数,会发起 OpCode.createSession 请求,

如调用 Zookeeper.setData 方法,会发起 OpCode.setData 操作。

最终会调用 ClientCnxn.submitRequest 方法将请求放入 outgoingQueue 队列中,并阻塞等待 Follower A 反馈。而 ClientCnxn.SendThread 线程会从 outgoingQueue 中取出请求,并发送给 Follower A。

下面代码 Zookeeper.setData 方法: Client A 构建对象发送给 Follower A

    public Stat setData(final String path, byte data[], int version)
        throws KeeperException, InterruptedException
    {
        final String clientPath = path;
        PathUtils. validatePath(clientPath);

        // 通过传入的 path 构造完整 serverPath
        final String serverPath = prependChroot(clientPath);

        // 构造一个 Request 头
        RequestHeader h = new RequestHeader();
        // 设置类型为 setData
        h.setType(ZooDefs.OpCode.setData);
        // 构造一个 SetData 请求体
        SetDataRequest request = new SetDataRequest();
        // 设置需要修改 node 的 serverPath
        request.setPath(serverPath);
        // 设置需要修改的 node 的 data
        request.setData(data);
        // 设置需要修改的 node 的 version
        request.setVersion(version);
       
        // 构建 SetDataResponse 对象
        SetDataResponse response = new SetDataResponse();

        // 提交请求,并等待返回结果
        ReplyHeader r = cnxn.submitRequest(h, request, response, null);
        // 如果 r.getErr() 不能 0,则表示有错误,抛出异常
        if (r.getErr() != 0) {
            throw KeeperException.create(KeeperException.Code. get(r.getErr()),
                    clientPath);
        }
        return response.getStat();
    }
【Follower A, Step 2,3】Follower A 的 NIOServerCnxn 类接到了 Client A 的请求,会调用 ZookeeperServer.processPacket 方法。该方法会构建一个 Request 对象,并调用第一个处理器 FollowerRequestProcessor 的 processRequest 方法。该方法将 Request 对象放入 FollowerRequestProcessor.queuedRequests 队列中。FollowerRequestProcessor 处理器线程会循环从 FollowerRequestProcessor.queuedRequests 队列中取出 Request 对象,并继续下面步骤:

1)调用下一个处理器 CommitProcessor 的 processRequest 方法。该方法将 Request 对象放入 CommitProcessor.queuedRequests 队列中;

2)通过 Request.type 判断 Request 类型。若发现是一个 Quorum 请求,会直接调用 Learner.request(request)方法。该方法将 Request 对象封装成一个 Leader.Request 的 Quorum 数据包,并发送给 Leader。

OpCode.sync 操作也将调用 Learner.request 方法将请求转发给 Leader,但在这之前会先将 Request 对象加入到 pendingSyncs 队列中。

FollowerRequestProcessor 的 run 方法如下:

    public void run() {
        try {
            while (!finished) {
                // 从 queuedRequests 队列中取出 Request 对象
                Request request = queuedRequests .take();
                if (LOG .isTraceEnabled()) {
                    ZooTrace. logRequest(LOG, ZooTrace.CLIENT_REQUEST_TRACE_MASK ,
                            ‘F’ , request, “” );
                }
                // 当 request 是 Request.requestOfDeath,一个 poison pill, 就退出 while 循环,
                // 并结束 FollowerRequestProcessor 线程
                if (request == Request.requestOfDeath) {
                    break ;
                }

                // 我们在提交这个 request 到 leader 之前,把这个 request 传递到下一个处理器。
                // 这样我们就准备好从 Leader 那得到 Response
                nextProcessor.processRequest(request);

                // 只有 Quorum 操作和 sync 操作才会调用 Follower.request 方法, 转发 Leader.REQUEST 数据包给 Leader
                //sync 操作和 Quorum 操作有一些不同,
                // 我们需要保持跟踪这个 sync 操作对于的 Follower 已经挂起,所有我们将它加入 pendingSyncs 队列中。
                switch (request.type) {
                case OpCode.sync:
                    // 将 OpCode.sync 放入 pendingSyncs 队列中
                    zks.pendingSyncs .add(request);
                    zks.getFollower().request(request);
                    break ;
                case OpCode.create:
                case OpCode.delete:
                case OpCode.setData:
                case OpCode.setACL:
                case OpCode.createSession:
                case OpCode.closeSession:
                case OpCode.multi:
                    //Quorum 请求,直接调用 Folloer.request 方法
                    zks.getFollower().request(request);
                    break ;
                }
            }
        } catch (Exception e) {
            LOG.error(“Unexpected exception causing exit” , e);
        }
        LOG.info(“FollowerRequestProcessor exited loop!”);
    }
【Leader A, Step 4】Leader A 的 LearnerHandler 线程会循环读取从 Learner 获得的 Quorum 数据包。如果数据包是 Learner.REQUEST 类型,则会解析 Quorum 数据包的内容,检查操作类型。
如果操作类型不是 OpCode.sync, 则会构造 Request 对象。并调用 ZooKeeperServer.submitRequest 方法(和上面 Follower 接收到请求所使用的 submitRequest 方法是同一个方法),并最终会调用第一个处理器 PrepRequestProcessor 的 submitRequest 方法,将 Request 对象放入 PrepRequestProcessor.submittedRequests 队列中。

如果操作类型是 OpCode.sync, 会构造 Request 类型的子类 LearnerSyncRequest 对象,并同样调用 PrepRequestProcessor 的 submitRequest 方法。

LearnerHandler.run 方法中对 Leader.REQUEST 数据包的处理代码如下:

    public void run () {
        ……
        case Leader.REQUEST :
          bb = ByteBuffer. wrap(qp .getData());
          // 从 QuorumPacket 中读取 sesssionId
          sessionId = bb.getLong();
          // 从 QuorumPacket 中读取 cxid
          cxid = bb.getInt();
          // 从 QuorumPacket 中读取操作类型
          type = bb.getInt();
          bb = bb.slice();
          Request si;
          // 如果操作 Code 的类型是 OpCode.sync, 则构造 LearnerSyncRequest 对象
          if (type == OpCode.sync){
              si = new LearnerSyncRequest(this , sessionId, cxid, type , bb, qp.getAuthinfo());
          }
            // 如果操作 Code 的类型不是 OpCode.sync, 则构造 Request 对象
          else {
              si = new Request(null , sessionId, cxid, type , bb, qp.getAuthinfo());
          }

          // 设置 owner
          si.setOwner(this);
          // 提交请求
          leader.zk .submitRequest(si);
          break ;
      ……
  }
PrepRequestProcessor 处理器线程会从 PrepRequestProcessor.submittedRequests 队列中取出 Request 对象,并根据 Request 类型构建 TxnHeader 和 Record 对象,然后分别赋给 Request.hdr 和 Request.txn。之后会调用下一个处理器 ProposalRequestProcessor 的 processRequest 方法,将 Request 对象传递给处理器 ProposalRequestProcessor。(如果发现有异常会则会创建一个错误 Record 类型对象)

PrepRequestProcessor 的 run 方法如下:

    public void run() {
        try {
            while (true) {
                // 从 submittedRequests 队列中取去第一个 request 对象
                Request request = submittedRequests .take();
                long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
                // 如果是 OpCode.ping 操作,则将 traceMask 设置成 ZooTrace. CLIENT_PING_TRACE_MASK
                if (request.type == OpCode.ping) {
                    traceMask = ZooTrace. CLIENT_PING_TRACE_MASK;
                }
                if (LOG .isTraceEnabled()) {
                    ZooTrace. logRequest(LOG, traceMask, ‘P’ , request, “”);
                }
                // 如果 request 是一个 requestOfDeath, 则退出 while 循环。
                if (Request.requestOfDeath == request) {
                    break ;
                }
                // 处理请求
                pRequest(request);
            }
        } catch (InterruptedException e) {
            LOG.error(“Unexpected interruption” , e);
        } catch (RequestProcessorException e) {
            if (e.getCause() instanceof XidRolloverException) {
                LOG.info(e.getCause().getMessage());
            }
            LOG.error(“Unexpected exception” , e);
        } catch (Exception e) {
            LOG.error(“Unexpected exception” , e);
        }
        LOG.info(“PrepRequestProcessor exited loop!”);
    } 
   
PrepRequestProcessor 的 pRequest2Txn 方法,该方法会在 pRequest 方法中调用,构建 TxnHeader 和 Record 对象。下面是关于 OpCode.setData 请求的代码:

    protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize)
        throws KeeperException, IOException, RequestProcessorException
    {
        request.hdr = new TxnHeader(request.sessionId , request.cxid, zxid,
                                    zks.getTime(), type);

        switch (type) {
            …..
            case OpCode.setData:
                // 检查 session
                zks.sessionTracker .checkSession(request.sessionId, request.getOwner());
                // 将 record 转成 SetDataRequest 类型
                SetDataRequest setDataRequest = (SetDataRequest)record;
                if (deserialize)
                    // 将 Request.reques 数据反序列化成 setDataRequest 对象
                    ByteBufferInputStream.byteBuffer2Record(request. request, setDataRequest);
                // 获取需要需要修改的 znode 的 path
                path = setDataRequest.getPath();
                // 获取内存数据中获取 path 对于的 znode 信息
                nodeRecord = getRecordForPath(path);
                // 检查对 znode 是否有写权限
                checkACL(zks, nodeRecord .acl , ZooDefs.Perms.WRITE,
                        request.authInfo);
                // 获取客户端设置的版本号
                version = setDataRequest.getVersion();
                // 获取节点当前版本号
                int currentVersion = nodeRecord.stat.getVersion();
                // 如果客户端设置的版本号不是 -1,且不等于当前版本号,则抛出 KeeperException.BadVersionException 异常
                if (version != -1 && version != currentVersion) {
                    throw new KeeperException .BadVersionException(path);
                }
                //version 等于当前版本加 1
                version = currentVersion + 1;
                // 构建 SetDataTxn 对象,并赋给 request.txn
                request. txn = new SetDataTxn(path, setDataRequest.getData(), version);
                // 拷贝 nodeRecord
                nodeRecord = nodeRecord.duplicate(request.hdr.getZxid());
                // 将 nodeRecord 的当前版本号设置为 version
                nodeRecord.stat.setVersion(version);
                // 将 nodeRecord 放入 outstandingChanges
                //path 和 nodeRecord map 放入 outstandingChangesForPath
                addChangeRecord(nodeRecord);
                break ;
          ……
        }
    }
【Leader A, Step 5,6】处理器 ProposalRequestProcessor 会先判断 Request 对象是否是 LearnerSyncRequest 类型。

如果不是 LearnerSyncRequest 类型(也就是 Quorum 请求),会按如下步骤执行:

1)调用下一个处理器 CommitProcessor 的 processRequest 方法,将 Request 对象放入 CommitProcessor.queuedRequests 队列中;

2)将 proposal 发送到所有的 Follower;

3)调用 SyncRequestProcessor 处理器的 processRequest 方法。该方法会将请求放入 SyncRequestProcessor.queuedRequests 队列中。(【Leader A, Step 7(1)】SyncRequestProcessor 线程会记录 Log, 然后传递给 SendAckRequestProcessor。SendAckRequestProcessor 会发送一个 Leader.ACK 的 Quorum 数据包给自己)

如果是 LearnerSyncRequest 类型,说明该请求是 OpCode.sync 操作,则会直接调用 Leader.processSync 方法。

ProposalRequestProcessor 的 processRequest 方法如下:

    public void processRequest(Request request) throws RequestPrzocessorException {
        // 如果是 sync 操作,则调用 Leader.processSync 方法
        if (request instanceof LearnerSyncRequest){
            zks.getLeader().processSync(( LearnerSyncRequest)request);
        }
        // 如果不是 sync 操作
        else {
            // 传递到下一个处理器
            nextProcessor.processRequest(request);
            if (request.hdr != null) {
                // We need to sync and get consensus on any transactions
                try {
                    // 发送 proposal 给所有的 follower
                    zks.getLeader().propose(request);
                } catch (XidRolloverException e) {
                    throw new RequestProcessorException (e.getMessage(), e);
                }
                // 调用 SyncRequestProcessor 处理器的 processRequest 方法
                syncProcessor.processRequest(request);
            }
        }
    }
Leader 的 propose 方法如下:
    /**
    * 创建 Proposal,并发送给所有的 members
    * @param request
    * @return the proposal that is queued to send to all the members
    */
    public Proposal propose(Request request) throws XidRolloverException {

        // 解决 rollover 的问题,所有低 32 位重置表示一个新的 leader 选择。强制重新选择 Leader。
        //See ZOOKEEPER- 1277
        if ((request.zxid & 0xffffffffL) == 0xffffffffL) {
            String msg =
                    “zxid lower 32 bits have rolled over, forcing re-election, and therefore new epoch start”;
            shutdown(msg);
            throw new XidRolloverException (msg);
        }
     
        // 将 request.hdr 和 request.txn 序列化到 boa 中
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        BinaryOutputArchive boa = BinaryOutputArchive. getArchive(baos);
        try {
            request.hdr.serialize(boa, “hdr”);
            if (request.txn != null) {
                request. txn.serialize(boa, “txn”);
            }
            baos.close();
        } catch (IOException e) {
            LOG.warn(“This really should be impossible” , e);
        }
        // 构造 Leader.PROPOSAL 的 QuorumPacket
        QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid,
                baos.toByteArray(), null);
        // 构造 Proposal 对象
        Proposal p = new Proposal();
        p.packet = pp;
        p.request = request;
        synchronized (this) {
            if (LOG .isDebugEnabled()) {
                LOG.debug(“Proposing:: ” + request);
            }

            // 获得 packet 的 zxid, 并放入 outstandingProposals 未完成 Proposal Map 中
            lastProposed = p.packet.getZxid();
            // 将 p 加入到 outstandingProposals Map 中
            outstandingProposals.put(lastProposed , p);
            // 发送给所有的 Follower
            sendPacket(pp);
        }
        return p;
    }   
Follower.processPacket 方法如下:

    /**
    * 检查在 qp 中接收到的 packet, 并根据它的内容进行分发
    * @param qp
    * @throws IOException
    */
    protected void processPacket(QuorumPacket qp) throws IOException{
        switch (qp.getType()) {
        case Leader.PING:
            ping(qp);
            break ;
        case Leader.PROPOSAL:
            TxnHeader hdr = new TxnHeader();
            // 从数据包 qp 中反序列化出 txn
            Record txn = SerializeUtils . deserializeTxn(qp.getData(), hdr);
            if (hdr.getZxid() != lastQueued + 1) {
                LOG.warn(“Got zxid 0x”
                        + Long. toHexString(hdr.getZxid())
                        + ” expected 0x”
                        + Long. toHexString(lastQueued + 1));
            }
            lastQueued = hdr.getZxid();
            fzk.logRequest(hdr, txn);
            break ;
        case Leader.COMMIT:
            fzk.commit(qp.getZxid());
            break ;
        case Leader.UPTODATE:
            LOG.error(“Received an UPTODATE message after Follower started”);
            break ;
        case Leader.REVALIDATE:
            revalidate(qp);
            break ;
        case Leader.SYNC:
            fzk.sync();
            break ;
        }
        }   
FollowerZooKeeperServer 的 logRequest 方法如下:

    public void logRequest(TxnHeader hdr, Record txn) {
        // 构建 Request 对象
        Request request = new Request(null, hdr.getClientId(), hdr.getCxid(),
                hdr.getType(), null , null);
        request.hdr = hdr;
        request.txn = txn;
        request.zxid = hdr.getZxid();
        // 如果 request.zxid 的低 32 为不全为 0, 则加入 pendingTxns 队列中
        if ((request.zxid & 0xffffffffL) != 0) {
            pendingTxns.add(request);
        }
        // 调用 SyncRequestProcessor 处理这个 request
        syncProcessor.processRequest(request);
        } 
【All Followers, Step 8】处理器 SyncRequestProcessor 的功能和 Leader 的 SyncRequestProcessor 一样,将请求记录到日志中,然后将 Request 请求传递给下一个处理器。不过 Follower 的下一个处理器是 SendAckRequestProcessor。该处理器会构建一个 Leader.ACK 的 Quorum 数据包,并发送给 Leader。

SendAckRequestProcessor 的 processRequest 方法如下:

    public void processRequest(Request si) {
        if (si.type != OpCode.sync){
            // 构建 Leader.ACK Quorum 包
            QuorumPacket qp = new QuorumPacket(Leader.ACK, si.hdr.getZxid(), null ,
                null );
            try {
                // 将 Leader.ACK Quorum 数据包发送给 Leader
                learner.writePacket(qp, false);
            } catch (IOException e) {
                LOG.warn(“Closing connection to leader, exception during packet send”, e);
                try {
                    if (!learner .sock .isClosed()) {
                        learner.sock .close();
                    }
                } catch (IOException e1) {
                    // Nothing to do, we are shutting things down, so an exception here is irrelevant
                    LOG.debug(“Ignoring error closing the connection” , e1);
                }
            }
        }
    }
【Leader A, Step 9】LearnerHandler 线程循环读取从 Learner 那获得的 Quorum 数据包。当发现是从 Follower 传输过来的 Leader.ACK 类型数据包,则会调用 Leader.processAck 方法进行处理。在 Leader.processAck 方法中,若已经有一个 Follower Quorom 发送了 Leader.ACK 数据包,则会执行下列三步骤:

1)调用 Leader.commit 方法,发送 Leader.COMMIT 类型 Quorum 数据包给所有 Follower;

2)调用 Leader.inform 方法,通知所有的 Observer;

3)调用处理器 CommitRequestProcessor.commit 方法,将 Request 对象放到 CommitRequestProcessor.committedRequests 队列中。(【Leader A, Step 10(1)-1,10(1)-2】CommitProcessor 线程会从 CommitRequestProcessor.committedRequests 队列中取出提交的 Request 对象,发现是和 nextPending 是一致的,然后提交的 Request 对象内容替换 nextPending 的内容,并将 nextPending 放入到 toProcess 队列中。下一次循环会从 toProcess 队列中取出 nextPending,然后调用下一个处理器 Leader.ToBeAppliedRequestProcessor 的 processRequest 方法。该方法会调用下一个处理器 FinalRequestProcessor 的 processRequest 方法。FinalRequestProcessor.processRequest 方法并根据 Request 对象中的操作更新内存中 Session 信息或者 znode 数据。)

Leader 的 processAck 方法如下:

    /**
    * 保存某个 proposal 接收到的 Ack 数量
    *
    * @param zxid
    *                被发送的 proposal 的 zxid
    * @param followerAddr
    */
    synchronized public void processAck(long sid, long zxid, SocketAddress followerAddr) {
        if (LOG .isTraceEnabled()) {
            LOG.trace(“Ack zxid: 0x{}” , Long.toHexString (zxid));
            for (Proposal p : outstandingProposals .values()) {
                long packetZxid = p.packet.getZxid();
                LOG.trace(“outstanding proposal: 0x{}” ,
                        Long. toHexString(packetZxid));
            }
            LOG.trace(“outstanding proposals all”);
        }

        // 如果 zxid 的低 32 位都是 0, 则直接 return
        if ((zxid & 0xffffffffL) == 0) {
            /*
            * We no longer process NEWLEADER ack by this method. However,
            * the learner sends ack back to the leader after it gets UPTODATE
            * so we just ignore the message.
            */
            return ;
        }
        // 如果没有未完成的 proposal, 则直接 return
        if (outstandingProposals .size() == 0) {
            if (LOG .isDebugEnabled()) {
                LOG.debug(“outstanding is 0”);
            }
            return ;
        }
        // 如果最近提交的 proposal 的 zxid 比 ack 的 proposal 的 zxid 大,说明 ack 的 proposal 已经提交了, 则直接 return
        if (lastCommitted >= zxid) {
            if (LOG .isDebugEnabled()) {
                LOG.debug(“proposal has already been committed, pzxid: 0x{} zxid: 0x{}”,
                        Long. toHexString(lastCommitted), Long.toHexString(zxid));
            }
            // The proposal has already been committed
            return ;
        }
        // 根据 zxid 取出 proposal 对象
        Proposal p = outstandingProposals .get(zxid);
        // 如果在未完成列表 outstandingProposal 中没有找到 zxid 对于的 proposal, 则说明该 zxid 对于的 Proposal 还没有处理。
        if (p == null) {
            LOG.warn(“Trying to commit future proposal: zxid 0x{} from {}”,
                    Long. toHexString(zxid), followerAddr );
            return ;
        }
      // 将发送 ack 的 Follower 的 sid 放入 Proposal.ackSet 集合中
        p. ackSet.add(sid);
        if (LOG .isDebugEnabled()) {
            LOG.debug(“Count for zxid: 0x{} is {}” ,
                    Long. toHexString(zxid), p.ackSet.size());
        }
        // 如果 ackSet 集合中已经包含了一个 Quorum
        if (self .getQuorumVerifier().containsQuorum(p.ackSet)){
            if (zxid != lastCommitted +1) {
                LOG.warn(“Commiting zxid 0x{} from {} not first!” ,
                        Long. toHexString(zxid), followerAddr );
                LOG.warn(“First is 0x{}” , Long.toHexString (lastCommitted + 1));
            }
            // 从 outstandingProposals 中删除掉这个 zxid 对于的 proposal 对象
            outstandingProposals.remove(zxid);
            // 如果 p.request 不等于 null, 则将这个 proposal 放入 toBeApplied 列表中
            if (p.request != null) {
                toBeApplied.add(p);
            }

            if (p.request == null) {
                LOG.warn(“Going to commmit null request for proposal: {}”, p);
            }
            // 发送 Leader.COMMIT 包给所有的 Follower
            commit(zxid);
            // 通知所有的 Observer
            inform(p);
            // 调用处理器 CommitProcessor 的 commit 方法
            zk. commitProcessor.commit(p.request);
            // 如果有 sync 等着等待这个 commit 的 zxid,发送 Leader.SYNC 数据包给对应的 Follower
            if (pendingSyncs .containsKey(zxid)){
                for (LearnerSyncRequest r: pendingSyncs .remove(zxid)) {
                    sendSync(r);
                }
            }
        }
    }
【All Follower, Step 10】Follower.followLeader 方法会循环读取从 Leader 的传输过来的 Quorum 数据包,并调用 Follower.processPacket 方法。该方法会根据数据的内容来分发。当发现是 Leader.COMMIT 类型的 Quorum 数据包,则会根据 Quorum 数据包的内容构造一个 Request 对象,并调用 FollowerZooKeeperServer.commit 方法。该方法最终会调用处理器 CommitRequestProcessor.commit 方法,将 Request 对象放到 CommitRequestProcessor.committedRequests 队列中。

FollowerZooKeeperServer.commit 方法如下:

    /**
    * 当接收到一个 COMMIT 消息,这个方法会被调用。该方法会将 COMMIT 消息
    * 中的 zxid 和 pendingTxns 队列中的第一个对象的 zxid 进行匹配。如何相同,则
    * 传递给处理器 CommitProcessor 进行 commit
    * @param zxid – must correspond to the head of pendingTxns if it exists
    */
    public void commit(long zxid) {
        if (pendingTxns .size() == 0) {
            LOG.warn(“Committing ” + Long. toHexString (zxid)
                    + ” without seeing txn” );
            return ;
        }
        // 取��pendingTxns 第一个元素的 zxid
        long firstElementZxid = pendingTxns .element().zxid;
        // 如果第一个元素的 zxid 不等于 COMMIT 消息中的 zxid, 则退出程序
        if (firstElementZxid != zxid) {
            LOG.error(“Committing zxid 0x” + Long. toHexString (zxid)
                    + ” but next pending txn 0x”
                    + Long. toHexString(firstElementZxid));
            System. exit(12);
        }
        //pendingTxns 取出,并删除第一个元素
        Request request = pendingTxns .remove();
        // 将从 pendingTxns 队列中取出的第一个 reqeust 对象传递给 CommitProcessor 处理器进行 commit
        commitProcessor.commit(request);
    }
【All Follower, Step 11】处理器 CommitProcessor 线程会处理提交的 Request 对象。

如果是 Follower A, nextPending 对象是和提交 Request 对象是一致的,所以将提交 Request 对象内容替换 nextPending 中的内容,并放入 toProcess 队列中。在下一个循环会从 toProcess 队列中取出并传递到下一个迭代器 FinalRequestProcessor 中。(和 Leader 中的 CommitProcessor 线程处理逻辑是一样的)

如果不是 Follower A, 则可能有下面两种情况:

1)queuedRequest 队列为 empty 且 nextPending 为 null, 也就是这个 Follower 没有自己转发的 request 正在处理;

2)nextPending 不为 null, 也就是有转发的 request 正在处理。但 nextPending 对象一定和提交的 Request 对象是不一致的。

不管是哪一种,都会直接将提交的 Request 对象加入到 toProcess 队列中。处理器 CommitProcessor 线程会从中取出并传递到下一个迭代器 FinalRequestProcessor 中。

CommitProcessor.run 方法如下:

    public void run() {
        try {
            Request nextPending = null;
            while (!finished) {
                int len = toProcess .size();
                for (int i = 0; i < len; i++) {
                    nextProcessor.processRequest(toProcess .get(i));
                }
                // 当将所有的 request 传递到下一个处理器 FinalRequestProcessor 后,将 toProcess 清空
                toProcess.clear();
                synchronized (this) {
                    // 如果 queuedRequests 队列为空,或者 nextPending 为 null, 或者 committedRequest 队列为控股,则等待。
                    if ((queuedRequests .size() == 0 || nextPending != null )
                            && committedRequests.size() == 0) {
                        wait();
                        continue ;
                    }
                    // 第一步,检查这个 commit 是否为了 pending request 而来
                    // 如果 commit request 到来,但是 queuedRequests 为空,或者 nextPending 为 null
                    if ((queuedRequests .size() == 0 || nextPending != null )
                            && committedRequests.size() > 0) {
                        Request r = committedRequests .remove();
                        /*
                        * We match with nextPending so that we can move to the
                        * next request when it is committed. We also want to
                        * use nextPending because it has the cnxn member set
                        * properly.
                        */
                        // 如果 nextPending 不等于 null,
                        if (nextPending != null
                                && nextPending. sessionId == r.sessionId
                                && nextPending. cxid == r.cxid ) {
                            // we want to send our version of the request.
                            // the pointer to the connection in the request
                            nextPending.hdr = r. hdr;
                            nextPending. txn = r.txn ;
                            nextPending. zxid = r.zxid ;
                            toProcess.add(nextPending);
                            nextPending = null ;
                        } else {
                            // this request came from someone else so just
                            // send the commit packet
                          // 如果这个请求来自于其他人,则直接加入到 toProcess 中
                          //sync 请求,或者不是 Follower 发起的请求
                            toProcess.add(r);
                        }
                    }
                }

                // 如果我们还没有匹配上 pending request, 则返回继续等待
                if (nextPending != null) {
                    continue ;
                }

                synchronized (this) {
                    // 处理 queuedRequests 中下一个请求
                    while (nextPending == null && queuedRequests.size() > 0) {
                        // 从 queuedRequests 中取出第一个,并将其从队列中删除
                        Request request = queuedRequests .remove();
                        switch (request.type) {
                        case OpCode.create:
                        case OpCode.delete:
                        case OpCode.setData:
                        case OpCode.multi:
                        case OpCode.setACL:
                        case OpCode.createSession:
                        case OpCode.closeSession:
                            // 如果不是 OpCode.sync 操作,则将 request 对象赋予 nextPending
                            nextPending = request;
                            break ;
                        case OpCode.sync:
                            if (matchSyncs) {
                                nextPending = request;
                            }
                            // 如果 matchSyncs 等于 false, 则直接加入到 toProcess, 不等待 Commit
                            else {
                                toProcess.add(request);
                            }
                            break ;
                        default :
                            toProcess.add(request);
                        }
                    }
                }
            }
        } catch (InterruptedException e) {
            LOG.warn(“Interrupted exception while waiting” , e);
        } catch (Throwable e) {
            LOG.error(“Unexpected exception causing CommitProcessor to exit”, e);
        }
        LOG.info(“CommitProcessor exited loop!”);
    }
 

【All Follower, Step 12】处理器 FinalRequestProcessor 更新内存中 Session 信息或者 znode 数据。

对于 Follower A,将会构建 Reponse,并返回 Response 给 Client A;

对于其它的 Follower, 不需要返回 Response 给客户端,直接返回。

 

 

 

 

 

FinalRequestProcessor.processRequest 方法如下。其中构造 Response 部分,只给出了 SetData 请求相关的代码。

    public void processRequest(Request request) {
        if (LOG .isDebugEnabled()) {
            LOG.debug(“Processing request:: ” + request);
        }
        // request.addRQRec(“>final”);
        long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
        if (request.type == OpCode.ping) {
            traceMask = ZooTrace. SERVER_PING_TRACE_MASK;
        }
        if (LOG .isTraceEnabled()) {
            ZooTrace. logRequest(LOG, traceMask, ‘E’ , request, “”);
        }
        ProcessTxnResult rc = null ;
        synchronized (zks.outstandingChanges) {
          // 循环从 outstandingChanges 中取出小于等于 request.zxid 的 ChangeRecord,并删除
            while (!zks .outstandingChanges .isEmpty()
                    && zks.outstandingChanges .get(0).zxid <= request.zxid) {
                ChangeRecord cr = zks.outstandingChanges .remove(0);
                if (cr.zxid < request.zxid) {
                    LOG.warn(“Zxid outstanding “
                            + cr. zxid
                            + ” is less than current ” + request.zxid );
                }
                if (zks .outstandingChangesForPath .get(cr.path) == cr) {
                    zks.outstandingChangesForPath .remove(cr.path);
                }
            }
 
            // 如果 request.hdr 不等于 null, 则在内存 Datatree 中处理这个请求
            if (request.hdr != null) {
              TxnHeader hdr = request. hdr;
              Record txn = request. txn;
 
              rc = zks.processTxn(hdr, txn);
            }
            // 检测这个 request 的类型是否是需要 Quorum Ack 的 requrest
            // 如果是,加入到 committedProposal 中
            if (Request. isQuorum(request.type)) {
                zks.getZKDatabase().addCommittedProposal(request);
            }
        }
 
        if (request.hdr != null && request.hdr.getType() == OpCode.closeSession ) {
            ServerCnxnFactory scxn = zks.getServerCnxnFactory();
            if (scxn != null && request.cnxn == null) {
                scxn.closeSession(request. sessionId);
                return ;
            }
        }
        // 如果 request 的 cnxn 为 null, 则直接 return
        if (request.cnxn == null) {
            return ;
        }
 
        // 下面是构造 response
        ServerCnxn cnxn = request. cnxn;
 
        String lastOp = “NA” ;
        zks.decInProcess();
        Code err = Code . OK;
        Record rsp = null;
        boolean closeSession = false;
        try {
            if (request.hdr != null && request.hdr.getType() == OpCode.error) {
                throw KeeperException.create(KeeperException.Code. get( (
                        (ErrorTxn) request. txn) .getErr()));
            }
 
            KeeperException ke = request.getException();
            if (ke != null && request.type != OpCode. multi) {
                throw ke;
            }
 
            if (LOG .isDebugEnabled()) {
                LOG.debug(“{}” ,request);
            }
            switch (request.type) {
            ……
            case OpCode.setData: {
                lastOp = “SETD” ;
                // 构建 SetDataResponse
                rsp = new SetDataResponse(rc.stat);
                err = Code. get(rc .err);
                break ;
            }
            ……
        } catch (SessionMovedException e) {
            cnxn.sendCloseSession();
            return ;
        } catch (KeeperException e) {
            // 如果有 KeeperException,则设置 err
            err = e.code();
        } catch (Exception e) {
            // log at error level as we are returning a marshalling
            // error to the user
            LOG.error(“Failed to process ” + request, e);
            StringBuilder sb = new StringBuilder();
            ByteBuffer bb = request. request;
            bb.rewind();
            while (bb.hasRemaining()) {
                sb.append(Integer. toHexString(bb.get() & 0xff));
            }
            LOG.error(“Dumping request buffer: 0x” + sb.toString());
            err = Code. MARSHALLINGERROR;
        }
        // 读取最后 zxid
        long lastZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid();
        ReplyHeader hdr =
            new ReplyHeader(request. cxid, lastZxid, err.intValue());
 
        zks.serverStats().updateLatency(request.createTime);
        cnxn.updateStatsForResponse(request. cxid, lastZxid, lastOp,
                    request. createTime, System.currentTimeMillis());
 
        try {
            // 发送 Response 给客户端
            cnxn.sendResponse(hdr, rsp, “response”);
            if (closeSession) {
                cnxn.sendCloseSession();
            }
        } catch (IOException e) {
            LOG.error(“FIXMSG” ,e);
        }
    } 

————————————– 分割线 ————————————–

Ubuntu 14.04 安装分布式存储 Sheepdog+ZooKeeper  http://www.linuxidc.com/Linux/2014-12/110352.htm

CentOS 6 安装 sheepdog 虚拟机分布式储存  http://www.linuxidc.com/Linux/2013-08/89109.htm

ZooKeeper 集群配置 http://www.linuxidc.com/Linux/2013-06/86348.htm

使用 ZooKeeper 实现分布式共享锁 http://www.linuxidc.com/Linux/2013-06/85550.htm

分布式服务框架 ZooKeeper — 管理分布式环境中的数据 http://www.linuxidc.com/Linux/2013-06/85549.htm

ZooKeeper 集群环境搭建实践 http://www.linuxidc.com/Linux/2013-04/83562.htm

ZooKeeper 服务器集群环境配置实测 http://www.linuxidc.com/Linux/2013-04/83559.htm

ZooKeeper 集群安装 http://www.linuxidc.com/Linux/2012-10/72906.htm

正文完
星哥玩云-微信公众号
post-qrcode
 0
星锅
版权声明:本站原创文章,由 星锅 于2022-01-20发表,共计26387字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中