Skip to content

Commit

Permalink
Rollover leader epoch when zxid counter is exhausted
Browse files Browse the repository at this point in the history
The rollover procedure:
1. Treats last proposal of an epoch as rollover proposal.
2. Requests from next epoch are proposed normally.
3. Fences next epoch once rollover proposal persisted.
4. Proposals from next epoch will not be written to disk before rollover
   committed.
5. Leader commits rollover proposal once it get quorum ACKs.
6. Blocked new epoch proposals are logged once rollover proposal is
   committed in corresponding nodes.

This results in:
1. No other lead cloud lead using next epoch number once rollover
   proposal is considered committed.
2. No proposals from next epoch will be written to disk before rollover
   proposal is considered committed.

Refs: ZOOKEEPER-1277, ZOOKEEPER-2789, ZOOKEEPER-4870, ZOOKEEPER-4882
  • Loading branch information
kezhuw committed Oct 27, 2024
1 parent 837f86c commit ec38180
Show file tree
Hide file tree
Showing 20 changed files with 1,264 additions and 137 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.zookeeper.server.quorum.LearnerHandler;
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
import org.apache.zookeeper.server.util.AuthUtil;
import org.apache.zookeeper.server.util.ZxidUtils;
import org.apache.zookeeper.txn.TxnDigest;
import org.apache.zookeeper.txn.TxnHeader;
import org.slf4j.Logger;
Expand Down Expand Up @@ -364,6 +365,10 @@ public boolean isQuorum() {
}
}

public boolean isRollover() {
return isQuorum() && ZxidUtils.isLastEpochZxid(zxid);
}

public static String op2String(int op) {
switch (op) {
case OpCode.notification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.apache.zookeeper.common.Time;
import org.apache.zookeeper.server.quorum.ObserverZooKeeperServer;
import org.apache.zookeeper.server.quorum.QuorumZooKeeperServer;
import org.apache.zookeeper.server.util.ZxidUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -205,15 +208,15 @@ public void run() {
// iff this is a read or a throttled request(which doesn't need to be written to the disk),
// and there are no pending flushes (writes), then just pass this to the next processor
if (nextProcessor != null) {
nextProcessor.processRequest(si);
handover(si);
if (nextProcessor instanceof Flushable) {
((Flushable) nextProcessor).flush();
}
}
continue;
}
toFlush.add(si);
if (shouldFlush()) {
if (si.isRollover() && shouldFlush()) {
flush();
}
ServerMetrics.getMetrics().SYNC_PROCESS_TIME.add(Time.currentElapsedTime() - startProcessTime);
Expand All @@ -224,6 +227,19 @@ public void run() {
LOG.info("SyncRequestProcessor exited!");
}

private void handover(Request request) throws IOException, RequestProcessorException {
if (request.isRollover() && zks instanceof QuorumZooKeeperServer) {
long nextEpoch = ZxidUtils.getEpochFromZxid(request.zxid) + 1;
// Fences upcoming epoch in leader election. So there will be no chance for other peer
// to lead next epoch if this request is considered committed.
((QuorumZooKeeperServer) zks).fenceRolloverEpoch(nextEpoch);
if (zks instanceof ObserverZooKeeperServer) {
((ObserverZooKeeperServer) zks).confirmRolloverEpoch(nextEpoch);
}
}
nextProcessor.processRequest(request);
}

private void flush() throws IOException, RequestProcessorException {
if (this.toFlush.isEmpty()) {
return;
Expand All @@ -242,7 +258,7 @@ private void flush() throws IOException, RequestProcessorException {
final Request i = this.toFlush.remove();
long latency = Time.currentElapsedTime() - i.syncQueueStartTime;
ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_AND_FLUSH_TIME.add(latency);
this.nextProcessor.processRequest(i);
handover(i);
}
if (this.nextProcessor instanceof Flushable) {
((Flushable) this.nextProcessor).flush();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ public class Follower extends Learner {

ObserverMaster om;

Follower(final QuorumPeer self, final FollowerZooKeeperServer zk) {
// VisibleForTesting
public Follower(final QuorumPeer self, final FollowerZooKeeperServer zk) {
this.self = Objects.requireNonNull(self);
this.fzk = Objects.requireNonNull(zk);
this.zk = zk;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
public class FollowerZooKeeperServer extends LearnerZooKeeperServer {

private static final Logger LOG = LoggerFactory.getLogger(FollowerZooKeeperServer.class);
private final ParticipantRequestSyncer requestSyncer = new ParticipantRequestSyncer(this, LOG, this::logRequest);

/*
* Pending sync requests
Expand All @@ -57,7 +58,8 @@ public class FollowerZooKeeperServer extends LearnerZooKeeperServer {
/**
* @throws IOException
*/
FollowerZooKeeperServer(FileTxnSnapLog logFactory, QuorumPeer self, ZKDatabase zkDb) throws IOException {
// VisibleForTesting
public FollowerZooKeeperServer(FileTxnSnapLog logFactory, QuorumPeer self, ZKDatabase zkDb) throws IOException {
super(logFactory, self.tickTime, self.minSessionTimeout, self.maxSessionTimeout, self.clientPortListenBacklog, zkDb, self);
this.pendingSyncs = new ConcurrentLinkedQueue<>();
}
Expand All @@ -80,7 +82,13 @@ protected void setupRequestProcessors() {
LinkedBlockingQueue<Request> pendingTxns = new LinkedBlockingQueue<>();

public void logRequest(TxnHeader hdr, Record txn, TxnDigest digest) {
final Request request = buildRequestToProcess(hdr, txn, digest);
Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid());
request.setTxnDigest(digest);
requestSyncer.syncRequest(request);
}

private void logRequest(Request request) {
pendingTxns.add(request);
syncProcessor.processRequest(request);
}

Expand Down Expand Up @@ -116,6 +124,7 @@ public void commit(long zxid) {
Request request = pendingTxns.remove();
request.logLatency(ServerMetrics.getMetrics().COMMIT_PROPAGATION_LATENCY);
commitProcessor.commit(request);
requestSyncer.finishCommit(request.zxid);
}

public synchronized void sync() {
Expand Down Expand Up @@ -188,20 +197,4 @@ protected void unregisterMetrics() {
rootContext.unregisterGauge("synced_observers");

}

/**
* Build a request for the txn
* @param hdr the txn header
* @param txn the txn
* @param digest the digest of txn
* @return a request moving through a chain of RequestProcessors
*/
private Request buildRequestToProcess(final TxnHeader hdr, final Record txn, final TxnDigest digest) {
final Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid());
request.setTxnDigest(digest);
if ((request.zxid & 0xffffffffL) != 0) {
pendingTxns.add(request);
}
return request;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1025,14 +1025,13 @@ public synchronized boolean tryToCommit(Proposal p, long zxid, SocketAddress fol
// we're sending the designated leader, and if the leader is changing the followers are
// responsible for closing the connection - this way we are sure that at least a majority of them
// receive the commit message.
commitAndActivate(zxid, designatedLeader);
commitAndActivate(p, designatedLeader);
informAndActivate(p, designatedLeader);
} else {
p.request.logLatency(ServerMetrics.getMetrics().QUORUM_ACK_LATENCY);
commit(zxid);
commit(p);
inform(p);
}
zk.commitProcessor.commit(p.request);
if (pendingSyncs.containsKey(zxid)) {
for (LearnerSyncRequest r : pendingSyncs.remove(zxid)) {
sendSync(r);
Expand Down Expand Up @@ -1065,16 +1064,7 @@ public synchronized void processAck(long sid, long zxid, SocketAddress followerA
LOG.trace("outstanding proposals all");
}

if ((zxid & 0xffffffffL) == 0) {
/*
* We no longer process NEWLEADER ack with this method. However,
* the learner sends an ack back to the leader after it gets
* UPTODATE, so we just ignore the message.
*/
return;
}

if (outstandingProposals.size() == 0) {
if (outstandingProposals.isEmpty()) {
LOG.debug("outstanding is 0");
return;
}
Expand Down Expand Up @@ -1212,25 +1202,30 @@ void sendObserverPacket(QuorumPacket qp) {
long lastCommitted = -1;

/**
* Create a commit packet and send it to all the members of the quorum
*
* @param zxid
* Commit proposal to all connected followers including itself.
*/
public void commit(long zxid) {
public void commit(Proposal p) {
long zxid = p.getZxid();
synchronized (this) {
lastCommitted = zxid;
}

zk.commit(p.request);

QuorumPacket qp = new QuorumPacket(Leader.COMMIT, zxid, null, null);
sendPacket(qp);
ServerMetrics.getMetrics().COMMIT_COUNT.add(1);
}

//commit and send some info
public void commitAndActivate(long zxid, long designatedLeader) {
public void commitAndActivate(Proposal p, long designatedLeader) {
long zxid = p.getZxid();
synchronized (this) {
lastCommitted = zxid;
}

zk.commit(p.request);

byte[] data = new byte[8];
ByteBuffer buffer = ByteBuffer.wrap(data);
buffer.putLong(designatedLeader);
Expand Down Expand Up @@ -1277,35 +1272,17 @@ public long getEpoch() {
return ZxidUtils.getEpochFromZxid(lastProposed);
}

@SuppressWarnings("serial")
public static class XidRolloverException extends Exception {

public XidRolloverException(String message) {
super(message);
}

}

/**
* create a proposal and send it out to all the members
*
* @param request
* @return the proposal that is queued to send to all the members
*/
public Proposal propose(Request request) throws XidRolloverException {
public Proposal propose(Request request) {
if (request.isThrottled()) {
LOG.error("Throttled request send as proposal: {}. Exiting.", request);
ServiceUtils.requestSystemExit(ExitCode.UNEXPECTED_ERROR.getValue());
}
/**
* Address the rollover issue. All lower 32bits set indicate a new leader
* election. Force a re-election instead. See ZOOKEEPER-1277
*/
if ((request.zxid & 0xffffffffL) == 0xffffffffL) {
String msg = "zxid lower 32 bits have rolled over, forcing re-election, and therefore new epoch start";
shutdown(msg);
throw new XidRolloverException(msg);
}

byte[] data = request.getSerializeData();
proposalStats.setLastBufferSize(data.length);
Expand All @@ -1331,6 +1308,7 @@ public Proposal propose(Request request) throws XidRolloverException {
sendPacket(pp);
}
ServerMetrics.getMetrics().PROPOSAL_COUNT.add(1);
zk.logRequest(request);
return p;
}

Expand Down Expand Up @@ -1465,6 +1443,22 @@ public void reportLookingSid(long sid) {
}
}

/**
* Comparing to {@link #getEpochToPropose(long, long)}, this method does not bump `acceptedEpoch`
* as the rollover txn may not be persisted yet.
*/
public void rolloverLeaderEpoch(long newEpoch) {
synchronized (connectingFollowers) {
if (waitingForNewEpoch) {
throw new IllegalStateException("ZAB is still waiting new epoch");
} else if (newEpoch != epoch + 1) {
String msg = String.format("can not rollover leader epoch to %s, current epoch is %s", newEpoch, epoch);
throw new IllegalArgumentException(msg);
}
epoch = newEpoch;
}
}

@Override
public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws InterruptedException, IOException {
synchronized (connectingFollowers) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,11 @@
import org.apache.zookeeper.server.RequestProcessor;
import org.apache.zookeeper.server.ServerCnxn;
import org.apache.zookeeper.server.ServerMetrics;
import org.apache.zookeeper.server.SyncRequestProcessor;
import org.apache.zookeeper.server.ZKDatabase;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
*
Expand All @@ -44,13 +47,18 @@
* FinalRequestProcessor
*/
public class LeaderZooKeeperServer extends QuorumZooKeeperServer {
private static final Logger LOG = LoggerFactory.getLogger(LeaderZooKeeperServer.class);

private ContainerManager containerManager; // guarded by sync

CommitProcessor commitProcessor;

PrepRequestProcessor prepRequestProcessor;

SyncRequestProcessor syncProcessor;
private final ParticipantRequestSyncer requestSyncer =
new ParticipantRequestSyncer(this, LOG, r -> syncProcessor.processRequest(r));

/**
* @throws IOException
*/
Expand All @@ -68,8 +76,10 @@ protected void setupRequestProcessors() {
RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader());
commitProcessor = new CommitProcessor(toBeAppliedProcessor, Long.toString(getServerId()), false, getZooKeeperServerListener());
commitProcessor.start();
AckRequestProcessor ackProcessor = new AckRequestProcessor(getLeader());
syncProcessor = new SyncRequestProcessor(this, ackProcessor);
syncProcessor.start();
ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this, commitProcessor);
proposalProcessor.initialize();
prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);
prepRequestProcessor.start();
firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);
Expand Down Expand Up @@ -159,6 +169,7 @@ protected synchronized void shutdownComponents() {
if (containerManager != null) {
containerManager.stop();
}
syncProcessor.shutdown();
super.shutdownComponents();
}

Expand All @@ -169,6 +180,21 @@ public int getGlobalOutstandingLimit() {
return globalOutstandingLimit;
}

@Override
public void confirmRolloverEpoch(long newEpoch) {
getLeader().rolloverLeaderEpoch(newEpoch);
super.confirmRolloverEpoch(newEpoch);
}

public void logRequest(Request request) {
requestSyncer.syncRequest(request);
}

public void commit(Request request) {
commitProcessor.commit(request);
requestSyncer.finishCommit(request.zxid);
}

@Override
public void createSessionTracker() {
sessionTracker = new LeaderSessionTracker(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -810,6 +810,21 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {
ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0));
writePacket(ack, true);
zk.startup();

long lastCommittedZxid = zk.getLastProcessedZxid();
long lastCommittedEpoch = ZxidUtils.getEpochFromZxid(lastCommittedZxid);
if (ZxidUtils.isLastEpochZxid(lastCommittedZxid)) {
lastCommittedEpoch += 1;
}
LOG.debug("lastCommittedZxid {}, lastCommittedEpoch {} newEpoch {}",
Long.toHexString(lastCommittedZxid), lastCommittedEpoch, newEpoch);
if (lastCommittedEpoch > newEpoch) {
LOG.info("Switch to new leader epoch {} from {}", lastCommittedEpoch, newEpoch);
newEpoch = lastCommittedEpoch;
self.setAcceptedEpoch(newEpoch);
self.setCurrentEpoch(newEpoch);
}

/*
* Update the election vote here to ensure that all members of the
* ensemble report the same vote to new servers that start up and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,8 @@ public void run() {
ServerMetrics.getMetrics().SNAP_COUNT.add(1);
}
} else {
LOG.info("Sending diffs last zxid of peer is 0x{}, zxid of leader is 0x{}",
Long.toHexString(peerLastZxid), Long.toHexString(leaderLastZxid));
syncThrottler = learnerMaster.getLearnerDiffSyncThrottler();
syncThrottler.beginSync(exemptFromThrottle);
ServerMetrics.getMetrics().INFLIGHT_DIFF_COUNT.add(syncThrottler.getSyncInProgress());
Expand Down
Loading

0 comments on commit ec38180

Please sign in to comment.