Skip to content
This repository has been archived by the owner on Mar 4, 2024. It is now read-only.

Commit

Permalink
Merge pull request #427 from MathieuBordere/flags
Browse files Browse the repository at this point in the history
Flags
  • Loading branch information
Mathieu Borderé authored Jun 6, 2023
2 parents 529c5c8 + 628a743 commit 825159c
Show file tree
Hide file tree
Showing 13 changed files with 221 additions and 3 deletions.
3 changes: 3 additions & 0 deletions Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ libraft_la_SOURCES = \
src/election.c \
src/entry.c \
src/err.c \
src/flags.c \
src/heap.c \
src/lifecycle.c \
src/log.c \
Expand Down Expand Up @@ -65,13 +66,15 @@ test_unit_core_SOURCES = \
src/compress.c \
src/configuration.c \
src/err.c \
src/flags.c \
src/heap.c \
src/log.c \
test/unit/main_core.c \
test/unit/test_byte.c \
test/unit/test_compress.c \
test/unit/test_configuration.c \
test/unit/test_err.c \
test/unit/test_flags.c \
test/unit/test_log.c \
test/unit/test_queue.c
test_unit_core_CFLAGS = $(AM_CFLAGS) -Wno-conversion
Expand Down
8 changes: 7 additions & 1 deletion include/raft.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ typedef unsigned long long raft_index;
*/
typedef unsigned long long raft_time;

/**
* Hold the features a raft node is capable of.
*/
typedef uint64_t raft_flags;

/**
* A data buffer.
*/
Expand Down Expand Up @@ -264,8 +269,9 @@ struct raft_append_entries_result
raft_term term; /* Receiver's current_term. */
raft_index rejected; /* If non-zero, the index that was rejected. */
raft_index last_log_index; /* Receiver's last log entry index, as hint. */
raft_flags features; /* Feature flags. */
};
#define RAFT_APPEND_ENTRIES_RESULT_VERSION 0
#define RAFT_APPEND_ENTRIES_RESULT_VERSION 1

/**
* Hold the arguments of an InstallSnapshot RPC (figure 5.3).
Expand Down
16 changes: 16 additions & 0 deletions src/flags.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#include "flags.h"

inline raft_flags flagsSet(raft_flags in, raft_flags flags)
{
return in | flags;
}

inline raft_flags flagsClear(raft_flags in, raft_flags flags)
{
return in & (~flags);
}

inline bool flagsIsSet(raft_flags in, raft_flags flag)
{
return (bool)(in & flag);
}
20 changes: 20 additions & 0 deletions src/flags.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#ifndef FLAGS_H_
#define FLAGS_H_

#include "../include/raft.h"

#define RAFT_DEFAULT_FEATURE_FLAGS (0)

/* Adds the flags @flags to @in and returns the new flags. Multiple flags should
* be combined using the `|` operator. */
raft_flags flagsSet(raft_flags in, raft_flags flags);

/* Clears the flags @flags from @in and returns the new flags. Multiple flags
* should be combined using the `|` operator. */
raft_flags flagsClear(raft_flags in, raft_flags flags);

/* Returns `true` if the single flag @flag is set in @in, otherwise returns
* `false`. */
bool flagsIsSet(raft_flags in, raft_flags flag);

#endif /* FLAGS_H */
13 changes: 13 additions & 0 deletions src/progress.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ static void initProgress(struct raft_progress *p, raft_index last_index)
p->snapshot_last_send = 0;
p->recent_recv = false;
p->state = PROGRESS__PROBE;
p->features = 0;
}

int progressBuildArray(struct raft *r)
Expand Down Expand Up @@ -174,6 +175,18 @@ void progressMarkRecentRecv(struct raft *r, const unsigned i)
r->leader_state.progress[i].recent_recv = true;
}

inline void progressSetFeatures(struct raft *r,
const unsigned i,
raft_flags features)
{
r->leader_state.progress[i].features = features;
}

inline raft_flags progressGetFeatures(struct raft *r, const unsigned i)
{
return r->leader_state.progress[i].features;
}

bool progressGetRecentRecv(const struct raft *r, const unsigned i)
{
return r->leader_state.progress[i].recent_recv;
Expand Down
9 changes: 8 additions & 1 deletion src/progress.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ struct raft_progress
raft_index snapshot_index; /* Last index of most recent snapshot sent. */
raft_time last_send; /* Timestamp of last AppendEntries RPC. */
raft_time snapshot_last_send; /* Timestamp of last InstallSnaphot RPC. */
bool recent_recv; /* A msg was received within election timeout. */
bool recent_recv; /* A msg was received within election timeout. */
raft_flags features; /* What the server is capable of. */
};

/* Create and initialize the array of progress objects used by the leader to *
Expand Down Expand Up @@ -122,4 +123,10 @@ bool progressMaybeDecrement(struct raft *r,
/* Return true if match_index is equal or higher than the snapshot_index. */
bool progressSnapshotDone(struct raft *r, unsigned i);

/* Sets the feature flags of a node. */
void progressSetFeatures(struct raft *r, const unsigned i, raft_flags features);

/* Gets the feature flags of a node. */
raft_flags progressGetFeatures(struct raft *r, const unsigned i);

#endif /* PROGRESS_H_ */
1 change: 1 addition & 0 deletions src/raft.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "convert.h"
#include "election.h"
#include "err.h"
#include "flags.h"
#include "heap.h"
#include "log.h"
#include "membership.h"
Expand Down
2 changes: 2 additions & 0 deletions src/recv_append_entries.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "assert.h"
#include "convert.h"
#include "entry.h"
#include "flags.h"
#include "heap.h"
#include "log.h"
#include "recv.h"
Expand Down Expand Up @@ -42,6 +43,7 @@ int recvAppendEntries(struct raft *r,
result->rejected = args->prev_log_index;
result->last_log_index = logLastIndex(r->log);
result->version = RAFT_APPEND_ENTRIES_RESULT_VERSION;
result->features = RAFT_DEFAULT_FEATURE_FLAGS;

rv = recvEnsureMatchingTerms(r, args->term, &match);
if (rv != 0) {
Expand Down
2 changes: 2 additions & 0 deletions src/recv_install_snapshot.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include "assert.h"
#include "convert.h"
#include "flags.h"
#include "log.h"
#include "recv.h"
#include "replication.h"
Expand Down Expand Up @@ -37,6 +38,7 @@ int recvInstallSnapshot(struct raft *r,
result->rejected = args->last_index;
result->last_log_index = logLastIndex(r->log);
result->version = RAFT_APPEND_ENTRIES_RESULT_VERSION;
result->features = RAFT_DEFAULT_FEATURE_FLAGS;

rv = recvEnsureMatchingTerms(r, args->term, &match);
if (rv != 0) {
Expand Down
5 changes: 5 additions & 0 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "error.h"
#endif
#include "err.h"
#include "flags.h"
#include "heap.h"
#include "lifecycle.h"
#include "log.h"
Expand Down Expand Up @@ -685,6 +686,8 @@ int replicationUpdate(struct raft *r,

progressMarkRecentRecv(r, i);

progressSetFeatures(r, i, result->features);

/* If the RPC failed because of a log mismatch, retry.
*
* From Figure 3.1:
Expand Down Expand Up @@ -850,6 +853,7 @@ static void appendFollowerCb(struct raft_io_append *req, int status)

result.term = r->current_term;
result.version = RAFT_APPEND_ENTRIES_RESULT_VERSION;
result.features = RAFT_DEFAULT_FEATURE_FLAGS;
if (status != 0) {
if (r->state != RAFT_FOLLOWER) {
tracef("local server is not follower -> ignore I/O failure");
Expand Down Expand Up @@ -1219,6 +1223,7 @@ static void installSnapshotCb(struct raft_io_snapshot_put *req, int status)

result.term = r->current_term;
result.version = RAFT_APPEND_ENTRIES_RESULT_VERSION;
result.features = RAFT_DEFAULT_FEATURE_FLAGS;
result.rejected = 0;

/* If we are shutting down, let's discard the result. */
Expand Down
13 changes: 12 additions & 1 deletion src/uv_encoding.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,18 @@ static size_t sizeofAppendEntries(const struct raft_append_entries *p)
16 * p->n_entries /* One header per entry */;
}

static size_t sizeofAppendEntriesResult(void)
static size_t sizeofAppendEntriesResultV0(void)
{
return sizeof(uint64_t) + /* Term. */
sizeof(uint64_t) + /* Success. */
sizeof(uint64_t) /* Last log index. */;
}

static size_t sizeofAppendEntriesResult(void)
{
return sizeofAppendEntriesResultV0() + sizeof(uint64_t) /* 64 bit Flags. */;
}

static size_t sizeofInstallSnapshot(const struct raft_install_snapshot *p)
{
size_t conf_size = configurationEncodedSize(&p->conf);
Expand Down Expand Up @@ -141,6 +146,7 @@ static void encodeAppendEntriesResult(
bytePut64(&cursor, p->term);
bytePut64(&cursor, p->rejected);
bytePut64(&cursor, p->last_log_index);
bytePut64(&cursor, p->features);
}

static void encodeInstallSnapshot(const struct raft_install_snapshot *p,
Expand Down Expand Up @@ -435,6 +441,11 @@ static void decodeAppendEntriesResult(const uv_buf_t *buf,
p->term = byteGet64(&cursor);
p->rejected = byteGet64(&cursor);
p->last_log_index = byteGet64(&cursor);
p->features = 0;
if (buf->len > sizeofAppendEntriesResultV0()) {
p->version = 1;
p->features = byteGet64(&cursor);
}
}

static int decodeInstallSnapshot(const uv_buf_t *buf,
Expand Down
35 changes: 35 additions & 0 deletions test/integration/test_replication.c
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "../../src/configuration.h"
#include "../../src/flags.h"
#include "../../src/progress.h"
#include "../lib/cluster.h"
#include "../lib/runner.h"
Expand Down Expand Up @@ -126,6 +127,40 @@ TEST(replication, sendInitialHeartbeat, setUp, tearDown, 0, NULL)
return MUNIT_OK;
}

/* After receiving an AppendEntriesResult, a leader has set the feature flags of
* a node. */
TEST(replication, receiveFlags, setUp, tearDown, 0, NULL)
{
struct fixture *f = data;
struct raft *raft;
CLUSTER_BOOTSTRAP;
CLUSTER_START;

/* Server 0 becomes leader and sends the initial heartbeat. */
CLUSTER_STEP_N(24);
ASSERT_LEADER(0);
ASSERT_TIME(1030);

/* Flags is empty */
raft = CLUSTER_RAFT(0);
munit_assert_ullong(raft->leader_state.progress[1].features, ==, 0);

raft = CLUSTER_RAFT(1);
/* Server 1 receives the first heartbeat. */
CLUSTER_STEP_N(4);
munit_assert_int(raft->election_timer_start, ==, 1045);
munit_assert_int(CLUSTER_N_RECV(1, RAFT_IO_APPEND_ENTRIES), ==, 1);

/* Server 0 receives the reply to the heartbeat. */
CLUSTER_STEP_N(2);
munit_assert_int(CLUSTER_N_RECV(0, RAFT_IO_APPEND_ENTRIES_RESULT), ==, 1);
raft = CLUSTER_RAFT(0);
munit_assert_ullong(raft->leader_state.progress[1].features, ==,
RAFT_DEFAULT_FEATURE_FLAGS);

return MUNIT_OK;
}

/* A leader keeps sending heartbeat messages at regular intervals to
* maintain leadership. */
TEST(replication, sendFollowupHeartbeat, setUp, tearDown, 0, NULL)
Expand Down
97 changes: 97 additions & 0 deletions test/unit/test_flags.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
#include "../../src/flags.h"
#include "../lib/runner.h"

/******************************************************************************
*
* flags
*
*****************************************************************************/

SUITE(flags)

TEST(flags, empty, NULL, NULL, 0, NULL)
{
raft_flags flags = 0;
for (int i = 0; i < 64; i++) {
munit_assert_false(flagsIsSet(flags, ((raft_flags)1) << i));
}
return MUNIT_OK;
}

TEST(flags, setClear, NULL, NULL, 0, NULL)
{
raft_flags flags = 0;
raft_flags flag = 0;
for (int i = 0; i < 64; i++) {
flag = ((raft_flags)1) << i;
flags = flagsSet(flags, flag);
munit_assert_true(flagsIsSet(flags, flag));
flags = flagsClear(flags, flag);
munit_assert_false(flagsIsSet(flags, flag));
munit_assert_true(flags == 0);
}
return MUNIT_OK;
}

TEST(flags, setMultipleClearMultiple, NULL, NULL, 0, NULL)
{
raft_flags in = 0;
raft_flags out;
raft_flags flags = (raft_flags)(1 | 1 << 4 | 1 << 13 | (raft_flags)1 << 40 |
(raft_flags)1 << 63);
out = flagsSet(in, flags);
/* clang-format off */
int positions[64] = {
1, 0, 0, 0, 1, 0, 0, 0, // 0th and 4th
0, 0, 0, 0, 0, 1, 0, 0, // 13th
0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0,
1, 0, 0, 0, 0, 0, 0, 0, // 40th
0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 1, // 63th
};
/* clang-format on */
for (unsigned i = 0; i < 64; i++) {
if (positions[i]) {
munit_assert_true(flagsIsSet(out, (raft_flags)1 << i));
} else {
munit_assert_false(flagsIsSet(out, (raft_flags)1 << i));
}
}
out = flagsClear(out, flags);
munit_assert_true(out == 0);
return MUNIT_OK;
}

TEST(flags, setMultipleClearSingle, NULL, NULL, 0, NULL)
{
raft_flags in = 0;
raft_flags out;
raft_flags flags = (raft_flags)(1 << 3 | 1 << 5 | 1 << 18 |
(raft_flags)1 << 32 | (raft_flags)1 << 35);
out = flagsSet(in, flags);
/* clang-format off */
int positions[64] = {
0, 0, 0, 1, 0, 1, 0, 0, // 3rd and 5th
0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 1, 0, 0, 0, 0, 0, // 18th
0, 0, 0, 0, 0, 0, 0, 0,
1, 0, 0, 1, 0, 0, 0, 0, // 32rd 35th
0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0,
};
/* clang-format on */
for (unsigned i = 0; i < 64; i++) {
if (positions[i]) {
munit_assert_true(flagsIsSet(out, (raft_flags)1 << i));
} else {
munit_assert_false(flagsIsSet(out, (raft_flags)1 << i));
}
}
out = flagsClear(out, (raft_flags)1 << 32);
munit_assert_true(
out == (raft_flags)(1 << 3 | 1 << 5 | 1 << 18 | (raft_flags)1 << 35));
return MUNIT_OK;
}

0 comments on commit 825159c

Please sign in to comment.