Skip to content
This repository has been archived by the owner on Mar 4, 2024. It is now read-only.

Commit

Permalink
replication: Cancel correct request type when append fails.
Browse files Browse the repository at this point in the history
  • Loading branch information
Mathieu Borderé committed Jul 27, 2023
1 parent 3abaab7 commit 69ed1f2
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 16 deletions.
12 changes: 9 additions & 3 deletions include/raft/fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,16 +138,22 @@ RAFT_API unsigned raft_fixture_leader_index(struct raft_fixture *f);
RAFT_API raft_id raft_fixture_voted_for(struct raft_fixture *f, unsigned i);

/**
* Drive the cluster so the @i'th server gets elected as leader.
* Drive the cluster so the @i'th server starts an election but doesn't
* necessarily win it.
*
* This is achieved by bumping the randomized election timeout of all other
* servers to a very high value, letting the one of the @i'th server expire and
* then stepping the cluster until the election is won.
* servers to a very high value, letting the one of the @i'th server expire.
*
* There must currently be no leader and no candidate and the given server must
* be a voting one. Also, the @i'th server must be connected to a majority of
* voting servers.
*/
RAFT_API void raft_fixture_start_elect(struct raft_fixture *f, unsigned i);

/**
* Calls raft_fixture_start_elect, but waits and asserts that the @i'th server
* has become the leader.
*/
RAFT_API void raft_fixture_elect(struct raft_fixture *f, unsigned i);

/**
Expand Down
7 changes: 6 additions & 1 deletion src/fixture.c
Original file line number Diff line number Diff line change
Expand Up @@ -1615,7 +1615,7 @@ void raft_fixture_hook(struct raft_fixture *f, raft_fixture_event_cb hook)
f->hook = hook;
}

void raft_fixture_elect(struct raft_fixture *f, unsigned i)
void raft_fixture_start_elect(struct raft_fixture *f, unsigned i)
{
struct raft *raft = raft_fixture_get(f, i);
unsigned j;
Expand All @@ -1637,7 +1637,12 @@ void raft_fixture_elect(struct raft_fixture *f, unsigned i)
* the minimum possible value compatible with its current state. */
minimizeRandomizedElectionTimeout(f, i);
maximizeAllRandomizedElectionTimeoutsExcept(f, i);
}

void raft_fixture_elect(struct raft_fixture *f, unsigned i)
{
struct raft *raft = raft_fixture_get(f, i);
raft_fixture_start_elect(f, i);
raft_fixture_step_until_has_leader(f, ELECTION_TIMEOUT * 20);
assert(f->leader_id == raft->id);
}
Expand Down
51 changes: 40 additions & 11 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,8 @@ static size_t updateLastStored(struct raft *r,
return i;
}

/* Get the request matching the given index and type, if any. */
/* Get the request matching the given @index and @type, if any.
* The type check is skipped when @type == -1. */
static struct request *getRequest(struct raft *r,
const raft_index index,
int type)
Expand All @@ -454,7 +455,9 @@ static struct request *getRequest(struct raft *r,
QUEUE_FOREACH (head, &r->leader_state.requests) {
req = QUEUE_DATA(head, struct request, queue);
if (req->index == index) {
assert(req->type == type);
if (type != -1) {
assert(req->type == type);
}
lifecycleRequestEnd(r, req);
return req;
}
Expand All @@ -463,9 +466,9 @@ static struct request *getRequest(struct raft *r,
}

/* Invoked once a disk write request for new entries has been completed. */
static void appendLeaderCb(struct raft_io_append *req, int status)
static void appendLeaderCb(struct raft_io_append *append, int status)
{
struct appendLeader *request = req->data;
struct appendLeader *request = append->data;
struct raft *r = request->raft;
size_t server_index;
raft_index index;
Expand All @@ -476,19 +479,45 @@ static void appendLeaderCb(struct raft_io_append *req, int status)

/* In case of a failed disk write, if we were the leader creating these
* entries in the first place, truncate our log too (since we have appended
* these entries to it) and fire the request callback.
* these entries to it) and fire the request callbacks.
*
* Afterward, convert immediately to follower state, giving the cluster a
* chance to elect another leader that doesn't have a full disk (or whatever
* caused our write error). */
if (status != 0) {
struct raft_apply *apply;
ErrMsgTransfer(r->io->errmsg, r->errmsg, "io");
apply =
(struct raft_apply *)getRequest(r, request->index, RAFT_COMMAND);
if (apply != NULL) {
if (apply->cb != NULL) {
apply->cb(apply, status, NULL);
for (unsigned i = 0; i < request->n; i++) {
const struct request *req = getRequest(r, request->index + i, -1);
if (!req) {
tracef("no request found at index %llu", request->index + i);
continue;

Check warning on line 493 in src/replication.c

View check run for this annotation

Codecov / codecov/patch

src/replication.c#L493

Added line #L493 was not covered by tests
}
switch (req->type) {
case RAFT_COMMAND: {
struct raft_apply *apply = (struct raft_apply *)req;
if (apply->cb) {
apply->cb(apply, status, NULL);
}
break;
}
case RAFT_BARRIER: {
struct raft_barrier *barrier = (struct raft_barrier *)req;
if (barrier->cb) {
barrier->cb(barrier, status);
}
break;
}
case RAFT_CHANGE: {
struct raft_change *change = (struct raft_change *)req;

Check warning on line 511 in src/replication.c

View check run for this annotation

Codecov / codecov/patch

src/replication.c#L510-L511

Added lines #L510 - L511 were not covered by tests
if (change->cb) {
change->cb(change, status);

Check warning on line 513 in src/replication.c

View check run for this annotation

Codecov / codecov/patch

src/replication.c#L513

Added line #L513 was not covered by tests
}
break;

Check warning on line 515 in src/replication.c

View check run for this annotation

Codecov / codecov/patch

src/replication.c#L515

Added line #L515 was not covered by tests
}
default:

Check warning on line 517 in src/replication.c

View check run for this annotation

Codecov / codecov/patch

src/replication.c#L517

Added line #L517 was not covered by tests
tracef("unknown request type, shutdown.");
assert(false);

Check warning on line 519 in src/replication.c

View check run for this annotation

Codecov / codecov/patch

src/replication.c#L519

Added line #L519 was not covered by tests
break;
}
}
goto out;
Expand Down
31 changes: 30 additions & 1 deletion test/integration/test_replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -1073,15 +1073,23 @@ TEST(replication, resultRetry, setUp, tearDown, 0, NULL)
return MUNIT_OK;
}

static void applyAssertStatusCb(struct raft_apply *req, int status, void *result)
{
(void) result;
int status_expected = (int)(intptr_t)(req->data);
munit_assert_int(status_expected, ==, status);
}

/* When the leader fails to write some new entries to disk, it steps down. */
TEST(replication, diskWriteFailure, setUp, tearDown, 0, NULL)
{
struct fixture *f = data;
struct raft_apply *req = munit_malloc(sizeof(*req));
req->data = (void*)(intptr_t)RAFT_IOERR;
BOOTSTRAP_START_AND_ELECT;

CLUSTER_IO_FAULT(0, 1, 1);
CLUSTER_APPLY_ADD_X(0, req, 1, NULL);
CLUSTER_APPLY_ADD_X(0, req, 1, applyAssertStatusCb);
/* The leader steps down when its disk write fails. */
CLUSTER_STEP_UNTIL_STATE_IS(0, RAFT_FOLLOWER, 2000);
free(req);
Expand Down Expand Up @@ -1179,3 +1187,24 @@ TEST(replication, lastStoredLaggingBehindCommitIndex, setUp, tearDown, 0, NULL)

return MUNIT_OK;
}

/* A leader with faulty disk fails to persist the barrier entry upon election.
*/
TEST(replication, failPersistBarrier, setUp, tearDown, 0, NULL)
{
struct fixture *f = data;
CLUSTER_GROW;

/* Server 0 will fail to persist entry 2, a barrier */
CLUSTER_IO_FAULT(0, 10, 1);

/* Server 0 gets elected and creates a barrier entry at index 2 */
CLUSTER_BOOTSTRAP;
CLUSTER_START;
CLUSTER_START_ELECT(0);

/* Cluster recovers. */
CLUSTER_STEP_UNTIL_HAS_LEADER(20000);

return MUNIT_OK;
}
3 changes: 3 additions & 0 deletions test/lib/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,9 @@
/* Elect the I'th server. */
#define CLUSTER_ELECT(I) raft_fixture_elect(&f->cluster, I)

/* Start to elect the I'th server. */
#define CLUSTER_START_ELECT(I) raft_fixture_start_elect(&f->cluster, I)

/* Depose the current leader */
#define CLUSTER_DEPOSE raft_fixture_depose(&f->cluster)

Expand Down

0 comments on commit 69ed1f2

Please sign in to comment.