diff --git a/include/raft/fixture.h b/include/raft/fixture.h index a32297057..635a9c4b0 100644 --- a/include/raft/fixture.h +++ b/include/raft/fixture.h @@ -138,16 +138,22 @@ RAFT_API unsigned raft_fixture_leader_index(struct raft_fixture *f); RAFT_API raft_id raft_fixture_voted_for(struct raft_fixture *f, unsigned i); /** - * Drive the cluster so the @i'th server gets elected as leader. + * Drive the cluster so the @i'th server starts an election but doesn't + * necessarily win it. * * This is achieved by bumping the randomized election timeout of all other - * servers to a very high value, letting the one of the @i'th server expire and - * then stepping the cluster until the election is won. + * servers to a very high value, letting the one of the @i'th server expire. * * There must currently be no leader and no candidate and the given server must * be a voting one. Also, the @i'th server must be connected to a majority of * voting servers. */ +RAFT_API void raft_fixture_start_elect(struct raft_fixture *f, unsigned i); + +/** + * Calls raft_fixture_start_elect, but waits and asserts that the @i'th server + * has become the leader. + */ RAFT_API void raft_fixture_elect(struct raft_fixture *f, unsigned i); /** diff --git a/src/fixture.c b/src/fixture.c index ff806838e..f1ad21dc4 100644 --- a/src/fixture.c +++ b/src/fixture.c @@ -1615,7 +1615,7 @@ void raft_fixture_hook(struct raft_fixture *f, raft_fixture_event_cb hook) f->hook = hook; } -void raft_fixture_elect(struct raft_fixture *f, unsigned i) +void raft_fixture_start_elect(struct raft_fixture *f, unsigned i) { struct raft *raft = raft_fixture_get(f, i); unsigned j; @@ -1637,7 +1637,12 @@ void raft_fixture_elect(struct raft_fixture *f, unsigned i) * the minimum possible value compatible with its current state. */ minimizeRandomizedElectionTimeout(f, i); maximizeAllRandomizedElectionTimeoutsExcept(f, i); +} +void raft_fixture_elect(struct raft_fixture *f, unsigned i) +{ + struct raft *raft = raft_fixture_get(f, i); + raft_fixture_start_elect(f, i); raft_fixture_step_until_has_leader(f, ELECTION_TIMEOUT * 20); assert(f->leader_id == raft->id); } diff --git a/src/replication.c b/src/replication.c index 753beda81..526c8cf47 100644 --- a/src/replication.c +++ b/src/replication.c @@ -440,7 +440,8 @@ static size_t updateLastStored(struct raft *r, return i; } -/* Get the request matching the given index and type, if any. */ +/* Get the request matching the given @index and @type, if any. + * The type check is skipped when @type == -1. */ static struct request *getRequest(struct raft *r, const raft_index index, int type) @@ -454,7 +455,9 @@ static struct request *getRequest(struct raft *r, QUEUE_FOREACH (head, &r->leader_state.requests) { req = QUEUE_DATA(head, struct request, queue); if (req->index == index) { - assert(req->type == type); + if (type != -1) { + assert(req->type == type); + } lifecycleRequestEnd(r, req); return req; } @@ -463,9 +466,9 @@ static struct request *getRequest(struct raft *r, } /* Invoked once a disk write request for new entries has been completed. */ -static void appendLeaderCb(struct raft_io_append *req, int status) +static void appendLeaderCb(struct raft_io_append *append, int status) { - struct appendLeader *request = req->data; + struct appendLeader *request = append->data; struct raft *r = request->raft; size_t server_index; raft_index index; @@ -476,19 +479,45 @@ static void appendLeaderCb(struct raft_io_append *req, int status) /* In case of a failed disk write, if we were the leader creating these * entries in the first place, truncate our log too (since we have appended - * these entries to it) and fire the request callback. + * these entries to it) and fire the request callbacks. * * Afterward, convert immediately to follower state, giving the cluster a * chance to elect another leader that doesn't have a full disk (or whatever * caused our write error). */ if (status != 0) { - struct raft_apply *apply; ErrMsgTransfer(r->io->errmsg, r->errmsg, "io"); - apply = - (struct raft_apply *)getRequest(r, request->index, RAFT_COMMAND); - if (apply != NULL) { - if (apply->cb != NULL) { - apply->cb(apply, status, NULL); + for (unsigned i = 0; i < request->n; i++) { + const struct request *req = getRequest(r, request->index + i, -1); + if (!req) { + tracef("no request found at index %llu", request->index + i); + continue; + } + switch (req->type) { + case RAFT_COMMAND: { + struct raft_apply *apply = (struct raft_apply *)req; + if (apply->cb) { + apply->cb(apply, status, NULL); + } + break; + } + case RAFT_BARRIER: { + struct raft_barrier *barrier = (struct raft_barrier *)req; + if (barrier->cb) { + barrier->cb(barrier, status); + } + break; + } + case RAFT_CHANGE: { + struct raft_change *change = (struct raft_change *)req; + if (change->cb) { + change->cb(change, status); + } + break; + } + default: + tracef("unknown request type, shutdown."); + assert(false); + break; } } goto out; diff --git a/test/integration/test_replication.c b/test/integration/test_replication.c index cb1f637b9..7708c6d74 100644 --- a/test/integration/test_replication.c +++ b/test/integration/test_replication.c @@ -1073,15 +1073,23 @@ TEST(replication, resultRetry, setUp, tearDown, 0, NULL) return MUNIT_OK; } +static void applyAssertStatusCb(struct raft_apply *req, int status, void *result) +{ + (void) result; + int status_expected = (int)(intptr_t)(req->data); + munit_assert_int(status_expected, ==, status); +} + /* When the leader fails to write some new entries to disk, it steps down. */ TEST(replication, diskWriteFailure, setUp, tearDown, 0, NULL) { struct fixture *f = data; struct raft_apply *req = munit_malloc(sizeof(*req)); + req->data = (void*)(intptr_t)RAFT_IOERR; BOOTSTRAP_START_AND_ELECT; CLUSTER_IO_FAULT(0, 1, 1); - CLUSTER_APPLY_ADD_X(0, req, 1, NULL); + CLUSTER_APPLY_ADD_X(0, req, 1, applyAssertStatusCb); /* The leader steps down when its disk write fails. */ CLUSTER_STEP_UNTIL_STATE_IS(0, RAFT_FOLLOWER, 2000); free(req); @@ -1179,3 +1187,24 @@ TEST(replication, lastStoredLaggingBehindCommitIndex, setUp, tearDown, 0, NULL) return MUNIT_OK; } + +/* A leader with faulty disk fails to persist the barrier entry upon election. + */ +TEST(replication, failPersistBarrier, setUp, tearDown, 0, NULL) +{ + struct fixture *f = data; + CLUSTER_GROW; + + /* Server 0 will fail to persist entry 2, a barrier */ + CLUSTER_IO_FAULT(0, 10, 1); + + /* Server 0 gets elected and creates a barrier entry at index 2 */ + CLUSTER_BOOTSTRAP; + CLUSTER_START; + CLUSTER_START_ELECT(0); + + /* Cluster recovers. */ + CLUSTER_STEP_UNTIL_HAS_LEADER(20000); + + return MUNIT_OK; +} diff --git a/test/lib/cluster.h b/test/lib/cluster.h index b25e6ecc4..decfdb3c6 100644 --- a/test/lib/cluster.h +++ b/test/lib/cluster.h @@ -356,6 +356,9 @@ /* Elect the I'th server. */ #define CLUSTER_ELECT(I) raft_fixture_elect(&f->cluster, I) +/* Start to elect the I'th server. */ +#define CLUSTER_START_ELECT(I) raft_fixture_start_elect(&f->cluster, I) + /* Depose the current leader */ #define CLUSTER_DEPOSE raft_fixture_depose(&f->cluster)