优秀的编程知识分享平台

网站首页 > 技术文章 正文

zookeeper源码分析之五服务端(集群leader)处理请求流程

nanyue 2024-07-31 12:07:21 技术文章 7 ℃

leader的实现类为LeaderZooKeeperServer,它间接继承自标准ZookeeperServer。它规定了请求到达leader时需要经历的路径:

PrepRequestProcessor -> ProposalRequestProcessor ->CommitProcessor -> Leader.ToBeAppliedRequestProcessor ->FinalRequestProcessor

具体情况可以参看代码:

@Override
 protected void setupRequestProcessors() {
 RequestProcessor finalProcessor = new FinalRequestProcessor(this);
 RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader());
 commitProcessor = new CommitProcessor(toBeAppliedProcessor,
 Long.toString(getServerId()), false,
 getZooKeeperServerListener());
 commitProcessor.start();
 ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this,
 commitProcessor);
 proposalProcessor.initialize();
 prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);
 prepRequestProcessor.start();
 firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);
 setupContainerManager();
 }

让我们一步步分析这些RP都做了什么工作?其中PrepRequestProcessor、FinalRequestProcessor已经在上篇文章中做了分析:zookeeper源码分析之四服务端(单机)处理请求流程

那我们就开始余下的RP吧

1. ProposalRequestProcessor

这个RP仅仅将请求转发到AckRequestProcessor和SyncRequestProcessor上,看具体代码:

public void processRequest(Request request) throws RequestProcessorException {
 // LOG.warn("Ack>>> cxid = " + request.cxid + " type = " +
 // request.type + " id = " + request.sessionId);
 // request.addRQRec(">prop");
 /* In the following IF-THEN-ELSE block, we process syncs on the leader.
 * If the sync is coming from a follower, then the follower
 * handler adds it to syncHandler. Otherwise, if it is a client of
 * the leader that issued the sync command, then syncHandler won't
 * contain the handler. In this case, we add it to syncHandler, and
 * call processRequest on the next processor.
 */
 if (request instanceof LearnerSyncRequest){
 zks.getLeader().processSync((LearnerSyncRequest)request);
 } else {
 nextProcessor.processRequest(request);
 if (request.getHdr() != null) {
 // We need to sync and get consensus on any transactions
 try {
 zks.getLeader().propose(request);
 } catch (XidRolloverException e) {
 throw new RequestProcessorException(e.getMessage(), e);
 }
 syncProcessor.processRequest(request);
 }
 }
 }

SyncRequestProcessor 我们已经在上文中进行了分析,这里就不在赘述了,那就看看AckRequestProcessor的工作是什么吧?

AckRequestProcessor仅仅将发送过来的请求作为ACk转发给leader。代码见明细:

 /**
 * Forward the request as an ACK to the leader
 */
 public void processRequest(Request request) {
 QuorumPeer self = leader.self;
 if(self != null)
 leader.processAck(self.getId(), request.zxid, null);
 else
 LOG.error("Null QuorumPeer");
 }

leader处理请求如下所示:

 /**
 * Keep a count of acks that are received by the leader for a particular
 * proposal
 *
 * @param zxid, the zxid of the proposal sent out
 * @param sid, the id of the server that sent the ack
 * @param followerAddr
 */
 synchronized public void processAck(long sid, long zxid, SocketAddress followerAddr) { 
 if (!allowedToCommit) return; // last op committed was a leader change - from now on 
 // the new leader should commit 
 if (LOG.isTraceEnabled()) {
 LOG.trace("Ack zxid: 0x{}", Long.toHexString(zxid));
 for (Proposal p : outstandingProposals.values()) {
 long packetZxid = p.packet.getZxid();
 LOG.trace("outstanding proposal: 0x{}",
 Long.toHexString(packetZxid));
 }
 LOG.trace("outstanding proposals all");
 }
 
 if ((zxid & 0xffffffffL) == 0) {
 /*
 * We no longer process NEWLEADER ack with this method. However,
 * the learner sends an ack back to the leader after it gets
 * UPTODATE, so we just ignore the message.
 */
 return;
 }
 
 
 if (outstandingProposals.size() == 0) {
 if (LOG.isDebugEnabled()) {
 LOG.debug("outstanding is 0");
 }
 return;
 }
 if (lastCommitted >= zxid) {
 if (LOG.isDebugEnabled()) {
 LOG.debug("proposal has already been committed, pzxid: 0x{} zxid: 0x{}",
 Long.toHexString(lastCommitted), Long.toHexString(zxid));
 }
 // The proposal has already been committed
 return;
 }
 Proposal p = outstandingProposals.get(zxid);
 if (p == null) {
 LOG.warn("Trying to commit future proposal: zxid 0x{} from {}",
 Long.toHexString(zxid), followerAddr);
 return;
 }
 
 p.addAck(sid); 
 /*if (LOG.isDebugEnabled()) {
 LOG.debug("Count for zxid: 0x{} is {}",
 Long.toHexString(zxid), p.ackSet.size());
 }*/
 
 boolean hasCommitted = tryToCommit(p, zxid, followerAddr);
 // If p is a reconfiguration, multiple other operations may be ready to be committed,
 // since operations wait for different sets of acks.
 // Currently we only permit one outstanding reconfiguration at a time
 // such that the reconfiguration and subsequent outstanding ops proposed while the reconfig is
 // pending all wait for a quorum of old and new config, so its not possible to get enough acks
 // for an operation without getting enough acks for preceding ops. But in the future if multiple
 // concurrent reconfigs are allowed, this can happen and then we need to check whether some pending
 // ops may already have enough acks and can be committed, which is what this code does.
 if (hasCommitted && p.request!=null && p.request.getHdr().getType() == OpCode.reconfig){
 long curZxid = zxid;
 while (allowedToCommit && hasCommitted && p!=null){
 curZxid++;
 p = outstandingProposals.get(curZxid);
 if (p !=null) hasCommitted = tryToCommit(p, curZxid, null); 
 }
 }
 }
 

调用实现,最终由CommitProcessor 接着处理请求:

 /**
 * @return True if committed, otherwise false.
 * @param a proposal p
 **/
 synchronized public boolean tryToCommit(Proposal p, long zxid, SocketAddress followerAddr) { 
 // make sure that ops are committed in order. With reconfigurations it is now possible
 // that different operations wait for different sets of acks, and we still want to enforce
 // that they are committed in order. Currently we only permit one outstanding reconfiguration
 // such that the reconfiguration and subsequent outstanding ops proposed while the reconfig is
 // pending all wait for a quorum of old and new config, so its not possible to get enough acks
 // for an operation without getting enough acks for preceding ops. But in the future if multiple
 // concurrent reconfigs are allowed, this can happen.
 if (outstandingProposals.containsKey(zxid - 1)) return false;
 
 // getting a quorum from all necessary configurations
 if (!p.hasAllQuorums()) {
 return false; 
 }
 
 // commit proposals in order
 if (zxid != lastCommitted+1) { 
 LOG.warn("Commiting zxid 0x" + Long.toHexString(zxid)
 + " from " + followerAddr + " not first!");
 LOG.warn("First is "
 + (lastCommitted+1));
 } 
 
 // in order to be committed, a proposal must be accepted by a quorum 
 
 outstandingProposals.remove(zxid);
 
 if (p.request != null) {
 toBeApplied.add(p);
 }
 if (p.request == null) {
 LOG.warn("Going to commmit null: " + p);
 } else if (p.request.getHdr().getType() == OpCode.reconfig) { 
 LOG.debug("Committing a reconfiguration! " + outstandingProposals.size()); 
 
 //if this server is voter in new config with the same quorum address, 
 //then it will remain the leader
 //otherwise an up-to-date follower will be designated as leader. This saves
 //leader election time, unless the designated leader fails 
 Long designatedLeader = getDesignatedLeader(p, zxid);
 //LOG.warn("designated leader is: " + designatedLeader);
 QuorumVerifier newQV = p.qvAcksetPairs.get(p.qvAcksetPairs.size()-1).getQuorumVerifier();
 
 self.processReconfig(newQV, designatedLeader, zk.getZxid(), true);
 
 if (designatedLeader != self.getId()) {
 allowedToCommit = false;
 }
 
 // we're sending the designated leader, and if the leader is changing the followers are 
 // responsible for closing the connection - this way we are sure that at least a majority of them 
 // receive the commit message.
 commitAndActivate(zxid, designatedLeader);
 informAndActivate(p, designatedLeader);
 //turnOffFollowers();
 } else {
 commit(zxid);
 inform(p);
 }
 zk.commitProcessor.commit(p.request);
 if(pendingSyncs.containsKey(zxid)){
 for(LearnerSyncRequest r: pendingSyncs.remove(zxid)) {
 sendSync(r);
 } 
 } 
 
 return true; 
 }

该程序第一步是发送一个请求到Quorum的所有成员

 /**
 * Create a commit packet and send it to all the members of the quorum
 *
 * @param zxid
 */
 public void commit(long zxid) {
 synchronized(this){
 lastCommitted = zxid;
 }
 QuorumPacket qp = new QuorumPacket(Leader.COMMIT, zxid, null, null);
 sendPacket(qp);
 }

发送报文如下:

 /**
 * send a packet to all the followers ready to follow
 *
 * @param qp
 * the packet to be sent
 */
 void sendPacket(QuorumPacket qp) {
 synchronized (forwardingFollowers) {
 for (LearnerHandler f : forwardingFollowers) {
 f.queuePacket(qp);
 }
 }
 }

第二步是通知Observer

 /**
 * Create an inform packet and send it to all observers.
 * @param zxid
 * @param proposal
 */
 public void inform(Proposal proposal) {
 QuorumPacket qp = new QuorumPacket(Leader.INFORM, proposal.request.zxid,
 proposal.packet.getData(), null);
 sendObserverPacket(qp);
 }

发送observer程序如下:

 /**
 * send a packet to all observers
 */
 void sendObserverPacket(QuorumPacket qp) {
 for (LearnerHandler f : getObservingLearners()) {
 f.queuePacket(qp);
 }
 }

第三步到

 zk.commitProcessor.commit(p.request);

2. CommitProcessor

CommitProcessor是多线程的,线程间通信通过queue,atomic和wait/notifyAll同步。CommitProcessor扮演一个网关角色,允许请求到剩下的处理管道。在同一瞬间,它支持多个读请求而仅支持一个写请求,这是为了保证写请求在事务中的顺序。

1个commit处理主线程,它监控请求队列,并将请求分发到工作线程,分发过程基于sessionId,这样特定session的读写请求通常分发到同一个线程,因而可以保证运行的顺序。

0~N个工作进程,他们在请求上运行剩下的请求处理管道。如果配置为0个工作线程,主commit线程将会直接运行管道。

经典(默认)线程数是:在32核的机器上,一个commit处理线程和32个工作线程。

多线程的限制:

每个session的请求处理必须是顺序的。

写请求处理必须按照zxid顺序。

必须保证一个session内不会出现写条件竞争,条件竞争可能导致另外一个session的读请求触发监控。

当前实现解决第三个限制,仅仅通过不允许在写请求时允许读进程的处理。

 @Override
 public void run() {
 Request request;
 try {
 while (!stopped) {
 synchronized(this) {
 while (
 !stopped &&
 ((queuedRequests.isEmpty() || isWaitingForCommit() || isProcessingCommit()) &&
 (committedRequests.isEmpty() || isProcessingRequest()))) {
 wait();
 }
 }
 /*
 * Processing queuedRequests: Process the next requests until we
 * find one for which we need to wait for a commit. We cannot
 * process a read request while we are processing write request.
 */
 while (!stopped && !isWaitingForCommit() &&
 !isProcessingCommit() &&
 (request = queuedRequests.poll()) != null) {
 if (needCommit(request)) {
 nextPending.set(request);
 } else {
 sendToNextProcessor(request);
 }
 }
 /*
 * Processing committedRequests: check and see if the commit
 * came in for the pending request. We can only commit a
 * request when there is no other request being processed.
 */
 processCommitted();
 }
 } catch (Throwable e) {
 handleException(this.getName(), e);
 }
 LOG.info("CommitProcessor exited loop!");
 }

主逻辑程序如下:

 /*
 * Separated this method from the main run loop
 * for test purposes (ZOOKEEPER-1863)
 */
 protected void processCommitted() {
 Request request;
 if (!stopped && !isProcessingRequest() &&
 (committedRequests.peek() != null)) {
 /*
 * ZOOKEEPER-1863: continue only if there is no new request
 * waiting in queuedRequests or it is waiting for a
 * commit. 
 */
 if ( !isWaitingForCommit() && !queuedRequests.isEmpty()) {
 return;
 }
 request = committedRequests.poll();
 /*
 * We match with nextPending so that we can move to the
 * next request when it is committed. We also want to
 * use nextPending because it has the cnxn member set
 * properly.
 */
 Request pending = nextPending.get();
 if (pending != null &&
 pending.sessionId == request.sessionId &&
 pending.cxid == request.cxid) {
 // we want to send our version of the request.
 // the pointer to the connection in the request
 pending.setHdr(request.getHdr());
 pending.setTxn(request.getTxn());
 pending.zxid = request.zxid;
 // Set currentlyCommitting so we will block until this
 // completes. Cleared by CommitWorkRequest after
 // nextProcessor returns.
 currentlyCommitting.set(pending);
 nextPending.set(null);
 sendToNextProcessor(pending);
 } else {
 // this request came from someone else so just
 // send the commit packet
 currentlyCommitting.set(request);
 sendToNextProcessor(request);
 }
 } 
 }

启动多线程处理程序

 /**
 * Schedule final request processing; if a worker thread pool is not being
 * used, processing is done directly by this thread.
 */
 private void sendToNextProcessor(Request request) {
 numRequestsProcessing.incrementAndGet();
 workerPool.schedule(new CommitWorkRequest(request), request.sessionId);
 }

真实逻辑是

 /**
 * Schedule work to be done by the thread assigned to this id. Thread
 * assignment is a single mod operation on the number of threads. If a
 * worker thread pool is not being used, work is done directly by
 * this thread.
 */
 public void schedule(WorkRequest workRequest, long id) {
 if (stopped) {
 workRequest.cleanup();
 return;
 }
 ScheduledWorkRequest scheduledWorkRequest =
 new ScheduledWorkRequest(workRequest);
 // If we have a worker thread pool, use that; otherwise, do the work
 // directly.
 int size = workers.size();
 if (size > 0) {
 try {
 // make sure to map negative ids as well to [0, size-1]
 int workerNum = ((int) (id % size) + size) % size;
 ExecutorService worker = workers.get(workerNum);
 worker.execute(scheduledWorkRequest);
 } catch (RejectedExecutionException e) {
 LOG.warn("ExecutorService rejected execution", e);
 workRequest.cleanup();
 }
 } else {
 // When there is no worker thread pool, do the work directly
 // and wait for its completion
 scheduledWorkRequest.start();
 try {
 scheduledWorkRequest.join();
 } catch (InterruptedException e) {
 LOG.warn("Unexpected exception", e);
 Thread.currentThread().interrupt();
 }
 }
 }

请求处理线程run方法:

 @Override
 public void run() {
 try {
 // Check if stopped while request was on queue
 if (stopped) {
 workRequest.cleanup();
 return;
 }
 workRequest.doWork();
 } catch (Exception e) {
 LOG.warn("Unexpected exception", e);
 workRequest.cleanup();
 }
 }

调用commitProcessor的doWork方法

 public void doWork() throws RequestProcessorException {
 try {
 nextProcessor.processRequest(request);
 } finally {
 // If this request is the commit request that was blocking
 // the processor, clear.
 currentlyCommitting.compareAndSet(request, null);
 /*
 * Decrement outstanding request count. The processor may be
 * blocked at the moment because it is waiting for the pipeline
 * to drain. In that case, wake it up if there are pending
 * requests.
 */
 if (numRequestsProcessing.decrementAndGet() == 0) {
 if (!queuedRequests.isEmpty() ||
 !committedRequests.isEmpty()) {
 wakeup();
 }
 }
 }
 }

将请求传递给下一个RP:Leader.ToBeAppliedRequestProcessor

3.Leader.ToBeAppliedRequestProcessor

Leader.ToBeAppliedRequestProcessor仅仅维护一个toBeApplied列表。

 /**
 * This request processor simply maintains the toBeApplied list. For
 * this to work next must be a FinalRequestProcessor and
 * FinalRequestProcessor.processRequest MUST process the request
 * synchronously!
 *
 * @param next
 * a reference to the FinalRequestProcessor
 */
 ToBeAppliedRequestProcessor(RequestProcessor next, Leader leader) {
 if (!(next instanceof FinalRequestProcessor)) {
 throw new RuntimeException(ToBeAppliedRequestProcessor.class
 .getName()
 + " must be connected to "
 + FinalRequestProcessor.class.getName()
 + " not "
 + next.getClass().getName());
 }
 this.leader = leader;
 this.next = next;
 }
 /*
 * (non-Javadoc)
 *
 * @see org.apache.zookeeper.server.RequestProcessor#processRequest(org.apache.zookeeper.server.Request)
 */
 public void processRequest(Request request) throws RequestProcessorException {
 next.processRequest(request);
 // The only requests that should be on toBeApplied are write
 // requests, for which we will have a hdr. We can't simply use
 // request.zxid here because that is set on read requests to equal
 // the zxid of the last write op.
 if (request.getHdr() != null) {
 long zxid = request.getHdr().getZxid();
 Iterator<Proposal> iter = leader.toBeApplied.iterator();
 if (iter.hasNext()) {
 Proposal p = iter.next();
 if (p.request != null && p.request.zxid == zxid) {
 iter.remove();
 return;
 }
 }
 LOG.error("Committed request not found on toBeApplied: "
 + request);
 }
 }

4. FinalRequestProcessor前文已经说明,本文不在赘述。

小结:从上面的分析可以知道,leader处理请求的顺序分别是:PrepRequestProcessor -> ProposalRequestProcessor ->CommitProcessor -> Leader.ToBeAppliedRequestProcessor ->FinalRequestProcessor。

请求先通过PrepRequestProcessor接收请求,并进行包装,然后请求类型的不同,设置同享数据;主要负责通知所有follower和observer;CommitProcessor 启动多线程处理请求;Leader.ToBeAppliedRequestProcessor仅仅维护一个toBeApplied列表;

FinalRequestProcessor来作为消息处理器的终结者,发送响应消息,并触发watcher的处理程序。

最近发表
标签列表