diff --git a/Makefile.am b/Makefile.am index c0cb74d83..024ed5a9e 100644 --- a/Makefile.am +++ b/Makefile.am @@ -21,6 +21,7 @@ libraft_la_SOURCES = \ src/election.c \ src/entry.c \ src/err.c \ + src/flags.c \ src/heap.c \ src/lifecycle.c \ src/log.c \ @@ -65,6 +66,7 @@ test_unit_core_SOURCES = \ src/compress.c \ src/configuration.c \ src/err.c \ + src/flags.c \ src/heap.c \ src/log.c \ test/unit/main_core.c \ @@ -72,6 +74,7 @@ test_unit_core_SOURCES = \ test/unit/test_compress.c \ test/unit/test_configuration.c \ test/unit/test_err.c \ + test/unit/test_flags.c \ test/unit/test_log.c \ test/unit/test_queue.c test_unit_core_CFLAGS = $(AM_CFLAGS) -Wno-conversion diff --git a/include/raft.h b/include/raft.h index 02c64f629..cf304f69f 100644 --- a/include/raft.h +++ b/include/raft.h @@ -79,6 +79,11 @@ typedef unsigned long long raft_index; */ typedef unsigned long long raft_time; +/** + * Hold the features a raft node is capable of. + */ +typedef uint64_t raft_flags; + /** * A data buffer. */ @@ -264,8 +269,9 @@ struct raft_append_entries_result raft_term term; /* Receiver's current_term. */ raft_index rejected; /* If non-zero, the index that was rejected. */ raft_index last_log_index; /* Receiver's last log entry index, as hint. */ + raft_flags features; /* Feature flags. */ }; -#define RAFT_APPEND_ENTRIES_RESULT_VERSION 0 +#define RAFT_APPEND_ENTRIES_RESULT_VERSION 1 /** * Hold the arguments of an InstallSnapshot RPC (figure 5.3). diff --git a/src/flags.c b/src/flags.c new file mode 100644 index 000000000..d075c31d1 --- /dev/null +++ b/src/flags.c @@ -0,0 +1,16 @@ +#include "flags.h" + +inline raft_flags flagsSet(raft_flags in, raft_flags flags) +{ + return in | flags; +} + +inline raft_flags flagsClear(raft_flags in, raft_flags flags) +{ + return in & (~flags); +} + +inline bool flagsIsSet(raft_flags in, raft_flags flag) +{ + return (bool)(in & flag); +} diff --git a/src/flags.h b/src/flags.h new file mode 100644 index 000000000..5da15ccd9 --- /dev/null +++ b/src/flags.h @@ -0,0 +1,20 @@ +#ifndef FLAGS_H_ +#define FLAGS_H_ + +#include "../include/raft.h" + +#define RAFT_DEFAULT_FEATURE_FLAGS (0) + +/* Adds the flags @flags to @in and returns the new flags. Multiple flags should + * be combined using the `|` operator. */ +raft_flags flagsSet(raft_flags in, raft_flags flags); + +/* Clears the flags @flags from @in and returns the new flags. Multiple flags + * should be combined using the `|` operator. */ +raft_flags flagsClear(raft_flags in, raft_flags flags); + +/* Returns `true` if the single flag @flag is set in @in, otherwise returns + * `false`. */ +bool flagsIsSet(raft_flags in, raft_flags flag); + +#endif /* FLAGS_H */ diff --git a/src/progress.c b/src/progress.c index 95bcb6e94..0b342e909 100644 --- a/src/progress.c +++ b/src/progress.c @@ -25,6 +25,7 @@ static void initProgress(struct raft_progress *p, raft_index last_index) p->snapshot_last_send = 0; p->recent_recv = false; p->state = PROGRESS__PROBE; + p->features = 0; } int progressBuildArray(struct raft *r) @@ -174,6 +175,18 @@ void progressMarkRecentRecv(struct raft *r, const unsigned i) r->leader_state.progress[i].recent_recv = true; } +inline void progressSetFeatures(struct raft *r, + const unsigned i, + raft_flags features) +{ + r->leader_state.progress[i].features = features; +} + +inline raft_flags progressGetFeatures(struct raft *r, const unsigned i) +{ + return r->leader_state.progress[i].features; +} + bool progressGetRecentRecv(const struct raft *r, const unsigned i) { return r->leader_state.progress[i].recent_recv; diff --git a/src/progress.h b/src/progress.h index 6eb17323a..e229fc187 100644 --- a/src/progress.h +++ b/src/progress.h @@ -23,7 +23,8 @@ struct raft_progress raft_index snapshot_index; /* Last index of most recent snapshot sent. */ raft_time last_send; /* Timestamp of last AppendEntries RPC. */ raft_time snapshot_last_send; /* Timestamp of last InstallSnaphot RPC. */ - bool recent_recv; /* A msg was received within election timeout. */ + bool recent_recv; /* A msg was received within election timeout. */ + raft_flags features; /* What the server is capable of. */ }; /* Create and initialize the array of progress objects used by the leader to * @@ -122,4 +123,10 @@ bool progressMaybeDecrement(struct raft *r, /* Return true if match_index is equal or higher than the snapshot_index. */ bool progressSnapshotDone(struct raft *r, unsigned i); +/* Sets the feature flags of a node. */ +void progressSetFeatures(struct raft *r, const unsigned i, raft_flags features); + +/* Gets the feature flags of a node. */ +raft_flags progressGetFeatures(struct raft *r, const unsigned i); + #endif /* PROGRESS_H_ */ diff --git a/src/raft.c b/src/raft.c index a7017ffdf..1dc85ef56 100644 --- a/src/raft.c +++ b/src/raft.c @@ -8,6 +8,7 @@ #include "convert.h" #include "election.h" #include "err.h" +#include "flags.h" #include "heap.h" #include "log.h" #include "membership.h" diff --git a/src/recv_append_entries.c b/src/recv_append_entries.c index 54a1f18a6..c265b6361 100644 --- a/src/recv_append_entries.c +++ b/src/recv_append_entries.c @@ -3,6 +3,7 @@ #include "assert.h" #include "convert.h" #include "entry.h" +#include "flags.h" #include "heap.h" #include "log.h" #include "recv.h" @@ -42,6 +43,7 @@ int recvAppendEntries(struct raft *r, result->rejected = args->prev_log_index; result->last_log_index = logLastIndex(r->log); result->version = RAFT_APPEND_ENTRIES_RESULT_VERSION; + result->features = RAFT_DEFAULT_FEATURE_FLAGS; rv = recvEnsureMatchingTerms(r, args->term, &match); if (rv != 0) { diff --git a/src/recv_install_snapshot.c b/src/recv_install_snapshot.c index 4f01d9a74..4f8b4b696 100644 --- a/src/recv_install_snapshot.c +++ b/src/recv_install_snapshot.c @@ -2,6 +2,7 @@ #include "assert.h" #include "convert.h" +#include "flags.h" #include "log.h" #include "recv.h" #include "replication.h" @@ -37,6 +38,7 @@ int recvInstallSnapshot(struct raft *r, result->rejected = args->last_index; result->last_log_index = logLastIndex(r->log); result->version = RAFT_APPEND_ENTRIES_RESULT_VERSION; + result->features = RAFT_DEFAULT_FEATURE_FLAGS; rv = recvEnsureMatchingTerms(r, args->term, &match); if (rv != 0) { diff --git a/src/replication.c b/src/replication.c index 61735c673..753beda81 100644 --- a/src/replication.c +++ b/src/replication.c @@ -8,6 +8,7 @@ #include "error.h" #endif #include "err.h" +#include "flags.h" #include "heap.h" #include "lifecycle.h" #include "log.h" @@ -685,6 +686,8 @@ int replicationUpdate(struct raft *r, progressMarkRecentRecv(r, i); + progressSetFeatures(r, i, result->features); + /* If the RPC failed because of a log mismatch, retry. * * From Figure 3.1: @@ -850,6 +853,7 @@ static void appendFollowerCb(struct raft_io_append *req, int status) result.term = r->current_term; result.version = RAFT_APPEND_ENTRIES_RESULT_VERSION; + result.features = RAFT_DEFAULT_FEATURE_FLAGS; if (status != 0) { if (r->state != RAFT_FOLLOWER) { tracef("local server is not follower -> ignore I/O failure"); @@ -1219,6 +1223,7 @@ static void installSnapshotCb(struct raft_io_snapshot_put *req, int status) result.term = r->current_term; result.version = RAFT_APPEND_ENTRIES_RESULT_VERSION; + result.features = RAFT_DEFAULT_FEATURE_FLAGS; result.rejected = 0; /* If we are shutting down, let's discard the result. */ diff --git a/src/uv_encoding.c b/src/uv_encoding.c index 3ee5cd08c..8e770f6bf 100644 --- a/src/uv_encoding.c +++ b/src/uv_encoding.c @@ -51,13 +51,18 @@ static size_t sizeofAppendEntries(const struct raft_append_entries *p) 16 * p->n_entries /* One header per entry */; } -static size_t sizeofAppendEntriesResult(void) +static size_t sizeofAppendEntriesResultV0(void) { return sizeof(uint64_t) + /* Term. */ sizeof(uint64_t) + /* Success. */ sizeof(uint64_t) /* Last log index. */; } +static size_t sizeofAppendEntriesResult(void) +{ + return sizeofAppendEntriesResultV0() + sizeof(uint64_t) /* 64 bit Flags. */; +} + static size_t sizeofInstallSnapshot(const struct raft_install_snapshot *p) { size_t conf_size = configurationEncodedSize(&p->conf); @@ -141,6 +146,7 @@ static void encodeAppendEntriesResult( bytePut64(&cursor, p->term); bytePut64(&cursor, p->rejected); bytePut64(&cursor, p->last_log_index); + bytePut64(&cursor, p->features); } static void encodeInstallSnapshot(const struct raft_install_snapshot *p, @@ -435,6 +441,11 @@ static void decodeAppendEntriesResult(const uv_buf_t *buf, p->term = byteGet64(&cursor); p->rejected = byteGet64(&cursor); p->last_log_index = byteGet64(&cursor); + p->features = 0; + if (buf->len > sizeofAppendEntriesResultV0()) { + p->version = 1; + p->features = byteGet64(&cursor); + } } static int decodeInstallSnapshot(const uv_buf_t *buf, diff --git a/test/integration/test_replication.c b/test/integration/test_replication.c index c0233c410..cb1f637b9 100644 --- a/test/integration/test_replication.c +++ b/test/integration/test_replication.c @@ -1,4 +1,5 @@ #include "../../src/configuration.h" +#include "../../src/flags.h" #include "../../src/progress.h" #include "../lib/cluster.h" #include "../lib/runner.h" @@ -126,6 +127,40 @@ TEST(replication, sendInitialHeartbeat, setUp, tearDown, 0, NULL) return MUNIT_OK; } +/* After receiving an AppendEntriesResult, a leader has set the feature flags of + * a node. */ +TEST(replication, receiveFlags, setUp, tearDown, 0, NULL) +{ + struct fixture *f = data; + struct raft *raft; + CLUSTER_BOOTSTRAP; + CLUSTER_START; + + /* Server 0 becomes leader and sends the initial heartbeat. */ + CLUSTER_STEP_N(24); + ASSERT_LEADER(0); + ASSERT_TIME(1030); + + /* Flags is empty */ + raft = CLUSTER_RAFT(0); + munit_assert_ullong(raft->leader_state.progress[1].features, ==, 0); + + raft = CLUSTER_RAFT(1); + /* Server 1 receives the first heartbeat. */ + CLUSTER_STEP_N(4); + munit_assert_int(raft->election_timer_start, ==, 1045); + munit_assert_int(CLUSTER_N_RECV(1, RAFT_IO_APPEND_ENTRIES), ==, 1); + + /* Server 0 receives the reply to the heartbeat. */ + CLUSTER_STEP_N(2); + munit_assert_int(CLUSTER_N_RECV(0, RAFT_IO_APPEND_ENTRIES_RESULT), ==, 1); + raft = CLUSTER_RAFT(0); + munit_assert_ullong(raft->leader_state.progress[1].features, ==, + RAFT_DEFAULT_FEATURE_FLAGS); + + return MUNIT_OK; +} + /* A leader keeps sending heartbeat messages at regular intervals to * maintain leadership. */ TEST(replication, sendFollowupHeartbeat, setUp, tearDown, 0, NULL) diff --git a/test/unit/test_flags.c b/test/unit/test_flags.c new file mode 100644 index 000000000..e156d9d62 --- /dev/null +++ b/test/unit/test_flags.c @@ -0,0 +1,97 @@ +#include "../../src/flags.h" +#include "../lib/runner.h" + +/****************************************************************************** + * + * flags + * + *****************************************************************************/ + +SUITE(flags) + +TEST(flags, empty, NULL, NULL, 0, NULL) +{ + raft_flags flags = 0; + for (int i = 0; i < 64; i++) { + munit_assert_false(flagsIsSet(flags, ((raft_flags)1) << i)); + } + return MUNIT_OK; +} + +TEST(flags, setClear, NULL, NULL, 0, NULL) +{ + raft_flags flags = 0; + raft_flags flag = 0; + for (int i = 0; i < 64; i++) { + flag = ((raft_flags)1) << i; + flags = flagsSet(flags, flag); + munit_assert_true(flagsIsSet(flags, flag)); + flags = flagsClear(flags, flag); + munit_assert_false(flagsIsSet(flags, flag)); + munit_assert_true(flags == 0); + } + return MUNIT_OK; +} + +TEST(flags, setMultipleClearMultiple, NULL, NULL, 0, NULL) +{ + raft_flags in = 0; + raft_flags out; + raft_flags flags = (raft_flags)(1 | 1 << 4 | 1 << 13 | (raft_flags)1 << 40 | + (raft_flags)1 << 63); + out = flagsSet(in, flags); + /* clang-format off */ + int positions[64] = { + 1, 0, 0, 0, 1, 0, 0, 0, // 0th and 4th + 0, 0, 0, 0, 0, 1, 0, 0, // 13th + 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, + 1, 0, 0, 0, 0, 0, 0, 0, // 40th + 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 1, // 63th + }; + /* clang-format on */ + for (unsigned i = 0; i < 64; i++) { + if (positions[i]) { + munit_assert_true(flagsIsSet(out, (raft_flags)1 << i)); + } else { + munit_assert_false(flagsIsSet(out, (raft_flags)1 << i)); + } + } + out = flagsClear(out, flags); + munit_assert_true(out == 0); + return MUNIT_OK; +} + +TEST(flags, setMultipleClearSingle, NULL, NULL, 0, NULL) +{ + raft_flags in = 0; + raft_flags out; + raft_flags flags = (raft_flags)(1 << 3 | 1 << 5 | 1 << 18 | + (raft_flags)1 << 32 | (raft_flags)1 << 35); + out = flagsSet(in, flags); + /* clang-format off */ + int positions[64] = { + 0, 0, 0, 1, 0, 1, 0, 0, // 3rd and 5th + 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 1, 0, 0, 0, 0, 0, // 18th + 0, 0, 0, 0, 0, 0, 0, 0, + 1, 0, 0, 1, 0, 0, 0, 0, // 32rd 35th + 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, + }; + /* clang-format on */ + for (unsigned i = 0; i < 64; i++) { + if (positions[i]) { + munit_assert_true(flagsIsSet(out, (raft_flags)1 << i)); + } else { + munit_assert_false(flagsIsSet(out, (raft_flags)1 << i)); + } + } + out = flagsClear(out, (raft_flags)1 << 32); + munit_assert_true( + out == (raft_flags)(1 << 3 | 1 << 5 | 1 << 18 | (raft_flags)1 << 35)); + return MUNIT_OK; +}