Skip to content

Commit

Permalink
Feature: add Raft::ensure_linearizable() to ensure linearizable read
Browse files Browse the repository at this point in the history
The `Raft::is_leader()` method does not fully ensure linearizable read
operations and is deprecated in this version. Instead, applications
should use the `Raft::ensure_linearizable()` method to guarantee
linearizability.

Under the hood, `Raft::ensure_linearizable()` obtains a `ReadIndex` from
`RaftCore` if it remains the leader, and blocks until the state
machine applies up to the `ReadIndex`. This process ensures that the
application observes all state visible to a preceding read operation.

- Fix: databendlabs#965

Upgrade tip:

Replace `Raft::is_leader()` with `Raft::ensure_linearizable()`.
  • Loading branch information
drmingdrmer committed Dec 8, 2023
1 parent dec6e32 commit 79372b4
Show file tree
Hide file tree
Showing 16 changed files with 464 additions and 42 deletions.
2 changes: 1 addition & 1 deletion examples/raft-kv-memstore/src/network/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub async fn read(app: Data<App>, req: Json<String>) -> actix_web::Result<impl R

#[post("/consistent_read")]
pub async fn consistent_read(app: Data<App>, req: Json<String>) -> actix_web::Result<impl Responder> {
let ret = app.raft.is_leader().await;
let ret = app.raft.ensure_linearizable().await;

match ret {
Ok(_) => {
Expand Down
2 changes: 1 addition & 1 deletion examples/raft-kv-rocksdb/src/network/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ async fn read(mut req: Request<Arc<App>>) -> tide::Result {
}

async fn consistent_read(mut req: Request<Arc<App>>) -> tide::Result {
let ret = req.state().raft.is_leader().await;
let ret = req.state().raft.ensure_linearizable().await;

match ret {
Ok(_) => {
Expand Down
10 changes: 8 additions & 2 deletions memstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,14 @@ pub type MemNodeId = u64;

openraft::declare_raft_types!(
/// Declare the type configuration for `MemStore`.
pub TypeConfig: D = ClientRequest, R = ClientResponse, NodeId = MemNodeId, Node = (),
Entry = Entry<TypeConfig>, SnapshotData = Cursor<Vec<u8>>, AsyncRuntime = TokioRuntime
pub TypeConfig:
D = ClientRequest,
R = ClientResponse,
NodeId = MemNodeId,
Node = (),
Entry = Entry<TypeConfig>,
SnapshotData = Cursor<Vec<u8>>,
AsyncRuntime = TokioRuntime
);

/// The application snapshot type which the `MemStore` works with.
Expand Down
26 changes: 17 additions & 9 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use crate::core::command_state::CommandState;
use crate::core::notify::Notify;
use crate::core::raft_msg::external_command::ExternalCommand;
use crate::core::raft_msg::AppendEntriesTx;
use crate::core::raft_msg::ClientReadTx;
use crate::core::raft_msg::ClientWriteTx;
use crate::core::raft_msg::InstallSnapshotTx;
use crate::core::raft_msg::RaftMsg;
Expand All @@ -45,7 +46,6 @@ use crate::engine::Engine;
use crate::engine::Respond;
use crate::entry::FromAppData;
use crate::entry::RaftEntry;
use crate::error::CheckIsLeaderError;
use crate::error::ClientWriteError;
use crate::error::Fatal;
use crate::error::ForwardToLeader;
Expand Down Expand Up @@ -263,12 +263,19 @@ where
// TODO: the second condition is such a read request can only read from state machine only when the last log it sees
// at `T1` is committed.
#[tracing::instrument(level = "trace", skip(self, tx))]
pub(super) async fn handle_check_is_leader_request(
&mut self,
tx: ResultSender<(), CheckIsLeaderError<C::NodeId, C::Node>>,
) {
pub(super) async fn handle_check_is_leader_request(&mut self, tx: ClientReadTx<C>) {
// Setup sentinel values to track when we've received majority confirmation of leadership.

let resp = {
let read_log_id = self.engine.state.get_read_log_id().copied();

// TODO: this applied is a little stale when being returned to client.
// Fix this when the following heartbeats are replaced with calling RaftNetwork.
let applied = self.engine.state.io_applied().copied();

(read_log_id, applied)
};

let my_id = self.id;
let my_vote = *self.engine.state.vote_ref();
let ttl = Duration::from_millis(self.config.heartbeat_interval);
Expand All @@ -278,7 +285,7 @@ where
let mut granted = btreeset! {my_id};

if eff_mem.is_quorum(granted.iter()) {
let _ = tx.send(Ok(()));
let _ = tx.send(Ok(resp));
return;
}

Expand Down Expand Up @@ -351,7 +358,7 @@ where
}
};

// If we receive a response with a greater term, then revert to follower and abort this
// If we receive a response with a greater vote, then revert to follower and abort this
// request.
if let AppendEntriesResponse::HigherVote(vote) = append_res {
debug_assert!(
Expand All @@ -368,7 +375,7 @@ where
});

if let Err(_e) = send_res {
tracing::error!("fail to send HigherVote to raft core");
tracing::error!("fail to send HigherVote to RaftCore");
}

// we are no longer leader so error out early
Expand All @@ -380,7 +387,7 @@ where
granted.insert(target);

if eff_mem.is_quorum(granted.iter()) {
let _ = tx.send(Ok(()));
let _ = tx.send(Ok(resp));
return;
}
}
Expand All @@ -395,6 +402,7 @@ where
.into()));
};

// TODO: do not spawn, manage read requests with a queue by RaftCore
C::AsyncRuntime::spawn(waiting_fu.instrument(tracing::debug_span!("spawn_is_leader_waiting")));

Check warning on line 406 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / cluster-benchmark (cluster_benchmark)

unused implementer of `futures::Future` that must be used

Check warning on line 406 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / stores (memstore)

unused implementer of `futures::Future` that must be used

Check warning on line 406 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / Build (nightly, bench,serde,bt,singlethreaded)

unused implementer of `futures::Future` that must be used

Check warning on line 406 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / stores (sledstore)

unused implementer of `futures::Future` that must be used

Check warning on line 406 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / openraft-test-bench (nightly)

unused implementer of `futures::Future` that must be used

Check warning on line 406 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / openraft-feature-test (nightly)

unused implementer of `futures::Future` that must be used

Check warning on line 406 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / openraft-feature-test (nightly, single-term-leader,serde,singlethreaded)

unused implementer of `futures::Future` that must be used

Check warning on line 406 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / openraft-feature-test (nightly, serde)

unused implementer of `futures::Future` that must be used

Check warning on line 406 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / openraft-feature-test (nightly, single-term-leader)

unused implementer of `futures::Future` that must be used

Check warning on line 406 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / openraft-test (nightly, 0, single-term-leader)

unused implementer of `futures::Future` that must be used

Check warning on line 406 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / openraft-test (nightly, 0, single-term-leader)

unused implementer of `futures::Future` that must be used

Check warning on line 406 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / openraft-test (nightly, 30)

unused implementer of `futures::Future` that must be used

Check warning on line 406 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / openraft-test (nightly, 30)

unused implementer of `futures::Future` that must be used

Check warning on line 406 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / tests-feature-test (nightly, loosen-follower-log-revert)

unused implementer of `futures::Future` that must be used

Check warning on line 406 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / tests-feature-test (nightly)

unused implementer of `futures::Future` that must be used

Check warning on line 406 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / tests-feature-test (nightly, single-term-leader)

unused implementer of `futures::Future` that must be used

Check warning on line 406 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / external-stores (stores/rocksstore-v2)

unused implementer of `futures::Future` that must be used

Check warning on line 406 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / stores (rocksstore)

unused implementer of `futures::Future` that must be used

Check warning on line 406 in openraft/src/core/raft_core.rs

View workflow job for this annotation

GitHub Actions / stores (rocksstore-compat07)

unused implementer of `futures::Future` that must be used
}

Expand Down
12 changes: 9 additions & 3 deletions openraft/src/core/raft_msg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ use crate::raft::InstallSnapshotRequest;
use crate::raft::InstallSnapshotResponse;
use crate::raft::VoteRequest;
use crate::raft::VoteResponse;
use crate::type_config::alias::LogIdOf;
use crate::type_config::alias::NodeIdOf;
use crate::type_config::alias::NodeOf;
use crate::ChangeMembers;
use crate::MessageSummary;
use crate::RaftTypeConfig;
Expand All @@ -35,8 +38,11 @@ pub(crate) type VoteTx<NID> = ResultSender<VoteResponse<NID>, Infallible>;
pub(crate) type AppendEntriesTx<NID> = ResultSender<AppendEntriesResponse<NID>, Infallible>;

/// TX for Client Write Response
pub(crate) type ClientWriteTx<C> =
ResultSender<ClientWriteResponse<C>, ClientWriteError<<C as RaftTypeConfig>::NodeId, <C as RaftTypeConfig>::Node>>;
pub(crate) type ClientWriteTx<C> = ResultSender<ClientWriteResponse<C>, ClientWriteError<NodeIdOf<C>, NodeOf<C>>>;

/// TX for Linearizable Read Response
pub(crate) type ClientReadTx<C> =
ResultSender<(Option<LogIdOf<C>>, Option<LogIdOf<C>>), CheckIsLeaderError<NodeIdOf<C>, NodeOf<C>>>;

/// A message sent by application to the [`RaftCore`].
///
Expand Down Expand Up @@ -65,7 +71,7 @@ where C: RaftTypeConfig
},

CheckIsLeaderRequest {
tx: ResultSender<(), CheckIsLeaderError<C::NodeId, C::Node>>,
tx: ClientReadTx<C>,
},

Initialize {
Expand Down
4 changes: 4 additions & 0 deletions openraft/src/docs/protocol/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
//! The protocol used by Openraft to replicate data.

pub mod read {
#![doc = include_str!("read.md")]
}

pub mod replication {
#![doc = include_str!("replication.md")]

Expand Down
74 changes: 74 additions & 0 deletions openraft/src/docs/protocol/read.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# Read Operations

Read operations within a Raft cluster are guaranteed to be linearizable.

This ensures that for any two read operations,
`A` and `B`, if `B` occurs after `A` by wall clock time,
then `B` will observe the same state as `A` or any subsequent state changes made by `A`,
regardless of which node within the cluster each operation is performed on.

In Openraft `read_index` is the same as the `read_index` in the original raft paper.
Openraft also use `read_log_id` instead of `read_index`.

## Ensuring linearizability

To ensure linearizability, read operations must perform a [`get_read_log_id()`] operation on the leader before proceeding.

This method confirms that this node is the leader at the time of invocation by sending heartbeats to a quorum of followers, and returns `(read_log_id, last_applied_log_id)`:
- `read_log_id` represents the log id up to which the state machine should apply to ensure a
linearizable read,
- `last_applied_log_id` is the last applied log id.

The caller then wait for `last_applied_log_id` to catch up `read_log_id`, which can be done by subscribing to [`Raft::metrics`],
and at last, proceed with the state machine read.

The above steps are encapsulated in the [`ensure_linearizable()`] method.

## Examples

```ignore
my_raft.ensure_linearizable().await?;
proceed_with_state_machine_read();
```

The above snippet does the same as the following:

```ignore
let (read_log_id, applied) = self.get_read_log_id().await?;
if read_log_id.index() > applied.index() {
self.wait(None).applied_index_at_least(read_log_id.index(), "").await?
}
proceed_with_state_machine_read();
```

The comparison `read_log_id > applied_log_id` would also be valid in the above example.


## Ensuring Linearizability with `read_log_id`

The `read_log_id` is determined as the maximum of the `last_committed_log_id` and the
log id of the first log entry in the current leader's term (the "blank" log entry).

Assumes another earlier read operation reads from state machine with up to log id `A`.
Since the leader has all committed entries up to its initial blank log entry,
we have: `read_log_id >= A`.

When the `last_applied_log_id` meets or exceeds `read_log_id`,
the state machine contains all state upto `A`. Therefore, a linearizable read is assured
when `last_applied_log_id >= read_log_id`.


## Ensuring Linearizability with `read_index`

And it is also legal by comparing `last_applied_log_id.index() >= read_log_id.index()`
due to the guarantee that committed logs will not be lost.

Since a previous read could only have observed committed logs, and `read_log_id.index()` is
at least as large as any committed log, once `last_applied_log_id.index() >= read_log_id.index()`, the state machine is assured to reflect all entries seen by any past read.


[`ensure_linearizable()`]: crate::Raft::ensure_linearizable
[`get_read_log_id()`]: crate::Raft::get_read_log_id
[`Raft::metrics`]: crate::Raft::metrics
22 changes: 19 additions & 3 deletions openraft/src/engine/log_id_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,6 @@ impl<NID: NodeId> LogIdList<NID> {
/// Get the log id at the specified index.
///
/// It will return `last_purged_log_id` if index is at the last purged index.
#[allow(dead_code)]
pub(crate) fn get(&self, index: u64) -> Option<LogId<NID>> {
let res = self.key_log_ids.binary_search_by(|log_id| log_id.index.cmp(&index));

Expand All @@ -285,17 +284,34 @@ impl<NID: NodeId> LogIdList<NID> {
}
}

#[allow(dead_code)]
pub(crate) fn first(&self) -> Option<&LogId<NID>> {
self.key_log_ids.first()
}

#[allow(dead_code)]
pub(crate) fn last(&self) -> Option<&LogId<NID>> {
self.key_log_ids.last()
}

pub(crate) fn key_log_ids(&self) -> &[LogId<NID>] {
&self.key_log_ids
}

/// Returns key log ids appended by the last leader.
///
/// Note that the 0-th log does not belong to any leader(but a membership log to initialize a
/// cluster) but this method does not differentiate between them.
pub(crate) fn by_last_leader(&self) -> &[LogId<NID>] {
let ks = &self.key_log_ids;
let l = ks.len();
if l < 2 {
return ks;
}

// There are at most two(adjacent) key log ids with the same leader_id
if ks[l - 1].leader_id() == ks[l - 2].leader_id() {
&ks[l - 2..]
} else {
&ks[l - 1..]
}
}
}
27 changes: 26 additions & 1 deletion openraft/src/engine/tests/log_id_list_test.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::engine::LogIdList;
use crate::testing::log_id;

#[test]
fn test_log_id_list_extend_from_same_leader() -> anyhow::Result<()> {
Expand Down Expand Up @@ -350,4 +351,28 @@ fn test_log_id_list_get_log_id() -> anyhow::Result<()> {

Ok(())
}
use crate::testing::log_id;

#[test]
fn test_log_id_list_by_last_leader() -> anyhow::Result<()> {
// len == 0
let ids = LogIdList::<u64>::default();
assert_eq!(ids.by_last_leader(), &[]);

// len == 1
let ids = LogIdList::<u64>::new([log_id(1, 1, 1)]);
assert_eq!(&[log_id(1, 1, 1)], ids.by_last_leader());

// len == 2, the last leader has only one log
let ids = LogIdList::<u64>::new([log_id(1, 1, 1), log_id(3, 1, 3)]);
assert_eq!(&[log_id(3, 1, 3)], ids.by_last_leader());

// len == 2, the last leader has two logs
let ids = LogIdList::<u64>::new([log_id(1, 1, 1), log_id(1, 1, 3)]);
assert_eq!(&[log_id(1, 1, 1), log_id(1, 1, 3)], ids.by_last_leader());

// len > 2, the last leader has only more than one logs
let ids = LogIdList::<u64>::new([log_id(1, 1, 1), log_id(7, 1, 8), log_id(7, 1, 10)]);
assert_eq!(&[log_id(7, 1, 8), log_id(7, 1, 10)], ids.by_last_leader());

Ok(())
}
Loading

0 comments on commit 79372b4

Please sign in to comment.