Skip to content
This repository has been archived by the owner on Mar 4, 2024. It is now read-only.

Commit

Permalink
flags: Add features field to AppendEntriesResultRPC.
Browse files Browse the repository at this point in the history
A leader always first sends an empty AppendEntriesRPC to a follower, a
heartbeat. The heartbeat response from the node will contain a features
field, advertising the capabilities of a node. Based on the features
the leader can decide to send specific messages or message formats to a
node.

Signed-off-by: Mathieu Borderé <[email protected]>
  • Loading branch information
Mathieu Borderé committed Jun 6, 2023
1 parent 86eb348 commit 628a743
Show file tree
Hide file tree
Showing 9 changed files with 80 additions and 3 deletions.
3 changes: 2 additions & 1 deletion include/raft.h
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,9 @@ struct raft_append_entries_result
raft_term term; /* Receiver's current_term. */
raft_index rejected; /* If non-zero, the index that was rejected. */
raft_index last_log_index; /* Receiver's last log entry index, as hint. */
raft_flags features; /* Feature flags. */
};
#define RAFT_APPEND_ENTRIES_RESULT_VERSION 0
#define RAFT_APPEND_ENTRIES_RESULT_VERSION 1

/**
* Hold the arguments of an InstallSnapshot RPC (figure 5.3).
Expand Down
13 changes: 13 additions & 0 deletions src/progress.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ static void initProgress(struct raft_progress *p, raft_index last_index)
p->snapshot_last_send = 0;
p->recent_recv = false;
p->state = PROGRESS__PROBE;
p->features = 0;
}

int progressBuildArray(struct raft *r)
Expand Down Expand Up @@ -174,6 +175,18 @@ void progressMarkRecentRecv(struct raft *r, const unsigned i)
r->leader_state.progress[i].recent_recv = true;
}

inline void progressSetFeatures(struct raft *r,
const unsigned i,
raft_flags features)
{
r->leader_state.progress[i].features = features;
}

inline raft_flags progressGetFeatures(struct raft *r, const unsigned i)
{
return r->leader_state.progress[i].features;
}

bool progressGetRecentRecv(const struct raft *r, const unsigned i)
{
return r->leader_state.progress[i].recent_recv;
Expand Down
9 changes: 8 additions & 1 deletion src/progress.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ struct raft_progress
raft_index snapshot_index; /* Last index of most recent snapshot sent. */
raft_time last_send; /* Timestamp of last AppendEntries RPC. */
raft_time snapshot_last_send; /* Timestamp of last InstallSnaphot RPC. */
bool recent_recv; /* A msg was received within election timeout. */
bool recent_recv; /* A msg was received within election timeout. */
raft_flags features; /* What the server is capable of. */
};

/* Create and initialize the array of progress objects used by the leader to *
Expand Down Expand Up @@ -122,4 +123,10 @@ bool progressMaybeDecrement(struct raft *r,
/* Return true if match_index is equal or higher than the snapshot_index. */
bool progressSnapshotDone(struct raft *r, unsigned i);

/* Sets the feature flags of a node. */
void progressSetFeatures(struct raft *r, const unsigned i, raft_flags features);

/* Gets the feature flags of a node. */
raft_flags progressGetFeatures(struct raft *r, const unsigned i);

#endif /* PROGRESS_H_ */
1 change: 1 addition & 0 deletions src/raft.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "convert.h"
#include "election.h"
#include "err.h"
#include "flags.h"
#include "heap.h"
#include "log.h"
#include "membership.h"
Expand Down
2 changes: 2 additions & 0 deletions src/recv_append_entries.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "assert.h"
#include "convert.h"
#include "entry.h"
#include "flags.h"
#include "heap.h"
#include "log.h"
#include "recv.h"
Expand Down Expand Up @@ -42,6 +43,7 @@ int recvAppendEntries(struct raft *r,
result->rejected = args->prev_log_index;
result->last_log_index = logLastIndex(r->log);
result->version = RAFT_APPEND_ENTRIES_RESULT_VERSION;
result->features = RAFT_DEFAULT_FEATURE_FLAGS;

rv = recvEnsureMatchingTerms(r, args->term, &match);
if (rv != 0) {
Expand Down
2 changes: 2 additions & 0 deletions src/recv_install_snapshot.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include "assert.h"
#include "convert.h"
#include "flags.h"
#include "log.h"
#include "recv.h"
#include "replication.h"
Expand Down Expand Up @@ -37,6 +38,7 @@ int recvInstallSnapshot(struct raft *r,
result->rejected = args->last_index;
result->last_log_index = logLastIndex(r->log);
result->version = RAFT_APPEND_ENTRIES_RESULT_VERSION;
result->features = RAFT_DEFAULT_FEATURE_FLAGS;

rv = recvEnsureMatchingTerms(r, args->term, &match);
if (rv != 0) {
Expand Down
5 changes: 5 additions & 0 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "error.h"
#endif
#include "err.h"
#include "flags.h"
#include "heap.h"
#include "lifecycle.h"
#include "log.h"
Expand Down Expand Up @@ -685,6 +686,8 @@ int replicationUpdate(struct raft *r,

progressMarkRecentRecv(r, i);

progressSetFeatures(r, i, result->features);

/* If the RPC failed because of a log mismatch, retry.
*
* From Figure 3.1:
Expand Down Expand Up @@ -850,6 +853,7 @@ static void appendFollowerCb(struct raft_io_append *req, int status)

result.term = r->current_term;
result.version = RAFT_APPEND_ENTRIES_RESULT_VERSION;
result.features = RAFT_DEFAULT_FEATURE_FLAGS;
if (status != 0) {
if (r->state != RAFT_FOLLOWER) {
tracef("local server is not follower -> ignore I/O failure");
Expand Down Expand Up @@ -1219,6 +1223,7 @@ static void installSnapshotCb(struct raft_io_snapshot_put *req, int status)

result.term = r->current_term;
result.version = RAFT_APPEND_ENTRIES_RESULT_VERSION;
result.features = RAFT_DEFAULT_FEATURE_FLAGS;
result.rejected = 0;

/* If we are shutting down, let's discard the result. */
Expand Down
13 changes: 12 additions & 1 deletion src/uv_encoding.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,18 @@ static size_t sizeofAppendEntries(const struct raft_append_entries *p)
16 * p->n_entries /* One header per entry */;
}

static size_t sizeofAppendEntriesResult(void)
static size_t sizeofAppendEntriesResultV0(void)
{
return sizeof(uint64_t) + /* Term. */
sizeof(uint64_t) + /* Success. */
sizeof(uint64_t) /* Last log index. */;
}

static size_t sizeofAppendEntriesResult(void)
{
return sizeofAppendEntriesResultV0() + sizeof(uint64_t) /* 64 bit Flags. */;
}

static size_t sizeofInstallSnapshot(const struct raft_install_snapshot *p)
{
size_t conf_size = configurationEncodedSize(&p->conf);
Expand Down Expand Up @@ -141,6 +146,7 @@ static void encodeAppendEntriesResult(
bytePut64(&cursor, p->term);
bytePut64(&cursor, p->rejected);
bytePut64(&cursor, p->last_log_index);
bytePut64(&cursor, p->features);
}

static void encodeInstallSnapshot(const struct raft_install_snapshot *p,
Expand Down Expand Up @@ -435,6 +441,11 @@ static void decodeAppendEntriesResult(const uv_buf_t *buf,
p->term = byteGet64(&cursor);
p->rejected = byteGet64(&cursor);
p->last_log_index = byteGet64(&cursor);
p->features = 0;
if (buf->len > sizeofAppendEntriesResultV0()) {
p->version = 1;
p->features = byteGet64(&cursor);
}
}

static int decodeInstallSnapshot(const uv_buf_t *buf,
Expand Down
35 changes: 35 additions & 0 deletions test/integration/test_replication.c
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "../../src/configuration.h"
#include "../../src/flags.h"
#include "../../src/progress.h"
#include "../lib/cluster.h"
#include "../lib/runner.h"
Expand Down Expand Up @@ -126,6 +127,40 @@ TEST(replication, sendInitialHeartbeat, setUp, tearDown, 0, NULL)
return MUNIT_OK;
}

/* After receiving an AppendEntriesResult, a leader has set the feature flags of
* a node. */
TEST(replication, receiveFlags, setUp, tearDown, 0, NULL)
{
struct fixture *f = data;
struct raft *raft;
CLUSTER_BOOTSTRAP;
CLUSTER_START;

/* Server 0 becomes leader and sends the initial heartbeat. */
CLUSTER_STEP_N(24);
ASSERT_LEADER(0);
ASSERT_TIME(1030);

/* Flags is empty */
raft = CLUSTER_RAFT(0);
munit_assert_ullong(raft->leader_state.progress[1].features, ==, 0);

raft = CLUSTER_RAFT(1);
/* Server 1 receives the first heartbeat. */
CLUSTER_STEP_N(4);
munit_assert_int(raft->election_timer_start, ==, 1045);
munit_assert_int(CLUSTER_N_RECV(1, RAFT_IO_APPEND_ENTRIES), ==, 1);

/* Server 0 receives the reply to the heartbeat. */
CLUSTER_STEP_N(2);
munit_assert_int(CLUSTER_N_RECV(0, RAFT_IO_APPEND_ENTRIES_RESULT), ==, 1);
raft = CLUSTER_RAFT(0);
munit_assert_ullong(raft->leader_state.progress[1].features, ==,
RAFT_DEFAULT_FEATURE_FLAGS);

return MUNIT_OK;
}

/* A leader keeps sending heartbeat messages at regular intervals to
* maintain leadership. */
TEST(replication, sendFollowupHeartbeat, setUp, tearDown, 0, NULL)
Expand Down

0 comments on commit 628a743

Please sign in to comment.