Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ZOOKEEPER-3624: Fix flaky QuorumPeerMainTest::testFailedTxnAsPartOfQuorumLoss #2204

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1578,6 +1578,7 @@ public void run() {
} else {
try {
reconfigFlagClear();
checkSuspended();
if (shuttingDownLE) {
shuttingDownLE = false;
startLeaderElection();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,9 @@ public void testWithOnlyMinSessionTimeout() throws Exception {
assertEquals(maxSessionTimeOut, quorumPeer.getMaxSessionTimeout(), "maximumSessionTimeOut is wrong");
}

/**
* Verify that failed txn in isolated leader got truncated after rejoining quorum.
*/
@Test
public void testFailedTxnAsPartOfQuorumLoss() throws Exception {
final int LEADER_TIMEOUT_MS = 10_000;
Expand All @@ -729,6 +732,8 @@ public void testFailedTxnAsPartOfQuorumLoss() throws Exception {
// increase the tick time to delay the leader going to looking
int previousTick = servers.mt[leader].main.quorumPeer.tickTime;
servers.mt[leader].main.quorumPeer.tickTime = LEADER_TIMEOUT_MS;
// isolate it from other quorum members by prevent it from rejoining
servers.mt[leader].getQuorumPeer().setSuspended(true);
// let the previous tick on the leader exhaust itself so the new tick time takes effect
Thread.sleep(previousTick);
LOG.warn("LEADER {}", leader);
Expand All @@ -739,34 +744,18 @@ public void testFailedTxnAsPartOfQuorumLoss() throws Exception {
}
}

// 3. start up the followers to form a new quorum
for (int i = 0; i < SERVER_COUNT; i++) {
if (i != leader) {
servers.mt[i].start();
}
}

// 4. wait one of the follower to be the new leader
for (int i = 0; i < SERVER_COUNT; i++) {
if (i != leader) {
// Recreate a client session since the previous session was not persisted.
servers.restartClient(i, this);
waitForOne(servers.zk[i], States.CONNECTED);
}
}

// 5. send a create request to old leader and make sure it's synced to disk,
// 3. send a create request to old leader and make sure it's synced to disk,
// which means it acked from itself
try {
servers.zk[leader].create("/zk" + leader, "zk".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
fail("create /zk" + leader + " should have failed");
} catch (KeeperException e) {
} catch (KeeperException ignored) {
}

// just make sure that we actually did get it in process at the
// leader
// just make sure that we actually did get it in process at the leader
//
// there can be extra sessionClose proposals
assertTrue(outstanding.size() > 0);
assertFalse(outstanding.isEmpty());
Proposal p = findProposalOfType(outstanding, OpCode.create);
LOG.info("Old leader id: {}. All proposals: {}", leader, outstanding);
assertNotNull(p, "Old leader doesn't have 'create' proposal");
Expand All @@ -782,36 +771,73 @@ public void testFailedTxnAsPartOfQuorumLoss() throws Exception {
sleepTime += 100;
}

// 6. wait for the leader to quit due to not enough followers and come back up as a part of the new quorum
LOG.info("Waiting for leader {} to timeout followers", leader);
// 4. start up the followers to form a new quorum
for (int i = 0; i < SERVER_COUNT; i++) {
if (i != leader) {
servers.mt[i].start();
}
}

// 5. wait one of the follower to be the new leader
for (int i = 0; i < SERVER_COUNT; i++) {
if (i != leader) {
// Recreate a new client session to avoid ConnectionLoss as connecting server is restarted.
servers.restartClient(i, this);
waitForOne(servers.zk[i], States.CONNECTED);
}
}

// 6. make sure new quorum does not replicate the failed txn
for (int i = 0; i < SERVER_COUNT; i++) {
if (i == leader) {
continue;
}
assertNull(servers.zk[i].exists("/zk" + leader, false),
"server " + i + " should not have /zk" + leader);
}

// resume election to rejoin the cluster
servers.mt[leader].getQuorumPeer().setSuspended(false);

// 7. wait for the leader to quit due to not enough followers and come back up as a part of the new quorum
LOG.info("Waiting for leader {} to timeout and rejoin as follower", leader);
sleepTime = 0;
Follower f = servers.mt[leader].main.quorumPeer.follower;
while (f == null || !f.isRunning()) {
if (sleepTime > LEADER_TIMEOUT_MS * 2) {
fail("Took too long for old leader to time out "
while (servers.mt[leader].getQuorumPeer().getPeerState() != QuorumPeer.ServerState.FOLLOWING) {
if (sleepTime > LEADER_TIMEOUT_MS * 10 * 2) {
fail("Took too long for old leader to time out and rejoin "
+ servers.mt[leader].main.quorumPeer.getPeerState());
}
Thread.sleep(100);
sleepTime += 100;
f = servers.mt[leader].main.quorumPeer.follower;
}

int newLeader = servers.findLeader();
// make sure a different leader was elected
assertNotEquals(leader, newLeader);

// 7. restart the previous leader to force it to replay the edits and possibly come up in a bad state
servers.mt[leader].shutdown();
servers.mt[leader].start();
// old client session can expire, restart it
// Now, all preconditions meet. Let's verify that the failed txn got truncated in whole cluster.

boolean restarted = false;
servers.restartClient(leader, this);
waitForAll(servers, States.CONNECTED);
waitForOne(servers.zk[leader], States.CONNECTED);
while (true) {
// 7. make sure everything is consistent, that is the failed txn got truncated in old leader.
for (int i = 0; i < SERVER_COUNT; i++) {
assertNull(servers.zk[i].exists("/zk" + leader, false),
"server " + i + " should not have /zk" + leader);
}

// 8. check the node exist in previous leader but not others
// make sure everything is consistent
for (int i = 0; i < SERVER_COUNT; i++) {
assertNull(servers.zk[i].exists("/zk" + leader, false),
"server " + i + " should not have /zk" + leader);
if (restarted) {
break;
}

// 8. make sure above holds after restart
servers.mt[leader].shutdown();
servers.mt[leader].start();
// old client session can expire, restart it
servers.restartClient(leader, this);
waitForAll(servers, States.CONNECTED);
restarted = true;
}
}

Expand Down
Loading