Skip to content

Commit

Permalink
Merge pull request databendlabs#893 from drmingdrmer/46-get-log-state
Browse files Browse the repository at this point in the history
Change: move get_log_state() from RaftLogReader to RaftStorage
  • Loading branch information
drmingdrmer authored Jul 4, 2023
2 parents c057441 + 330b1ff commit add2f7b
Show file tree
Hide file tree
Showing 16 changed files with 221 additions and 212 deletions.
44 changes: 22 additions & 22 deletions cluster_benchmark/tests/benchmark/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,28 +116,6 @@ impl RaftLogReader<TypeConfig> for Arc<LogStore> {

Ok(entries)
}

async fn get_log_state(&mut self) -> Result<LogState<TypeConfig>, StorageError<NodeId>> {
let log = self.log.read().await;
let last_serialized = log.iter().rev().next().map(|(_, ent)| ent);

let last = match last_serialized {
None => None,
Some(ent) => Some(*ent.get_log_id()),
};

let last_purged = self.last_purged_log_id.read().await.clone();

let last = match last {
None => last_purged,
Some(x) => Some(x),
};

Ok(LogState {
last_purged_log_id: last_purged,
last_log_id: last,
})
}
}

#[async_trait]
Expand Down Expand Up @@ -194,6 +172,28 @@ impl RaftSnapshotBuilder<TypeConfig> for Arc<StateMachineStore> {

#[async_trait]
impl RaftLogStorage<TypeConfig> for Arc<LogStore> {
async fn get_log_state(&mut self) -> Result<LogState<TypeConfig>, StorageError<NodeId>> {
let log = self.log.read().await;
let last_serialized = log.iter().rev().next().map(|(_, ent)| ent);

let last = match last_serialized {
None => None,
Some(ent) => Some(*ent.get_log_id()),
};

let last_purged = self.last_purged_log_id.read().await.clone();

let last = match last {
None => last_purged,
Some(x) => Some(x),
};

Ok(LogState {
last_purged_log_id: last_purged,
last_log_id: last,
})
}

#[tracing::instrument(level = "trace", skip(self))]
async fn save_vote(&mut self, vote: &Vote<NodeId>) -> Result<(), StorageError<NodeId>> {
let mut v = self.vote.write().await;
Expand Down
34 changes: 17 additions & 17 deletions examples/raft-kv-memstore/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,23 +96,6 @@ pub struct Store {

#[async_trait]
impl RaftLogReader<TypeConfig> for Arc<Store> {
async fn get_log_state(&mut self) -> Result<LogState<TypeConfig>, StorageError<NodeId>> {
let log = self.log.read().await;
let last = log.iter().next_back().map(|(_, ent)| ent.log_id);

let last_purged = *self.last_purged_log_id.read().await;

let last = match last {
None => last_purged,
Some(x) => Some(x),
};

Ok(LogState {
last_purged_log_id: last_purged,
last_log_id: last,
})
}

async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug + Send + Sync>(
&mut self,
range: RB,
Expand Down Expand Up @@ -180,6 +163,23 @@ impl RaftStorage<TypeConfig> for Arc<Store> {
type LogReader = Self;
type SnapshotBuilder = Self;

async fn get_log_state(&mut self) -> Result<LogState<TypeConfig>, StorageError<NodeId>> {
let log = self.log.read().await;
let last = log.iter().next_back().map(|(_, ent)| ent.log_id);

let last_purged = *self.last_purged_log_id.read().await;

let last = match last {
None => last_purged,
Some(x) => Some(x),
};

Ok(LogState {
last_purged_log_id: last_purged,
last_log_id: last,
})
}

#[tracing::instrument(level = "trace", skip(self))]
async fn save_vote(&mut self, vote: &Vote<NodeId>) -> Result<(), StorageError<NodeId>> {
let mut v = self.vote.write().await;
Expand Down
36 changes: 18 additions & 18 deletions examples/raft-kv-rocksdb/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,24 +318,6 @@ impl Store {

#[async_trait]
impl RaftLogReader<TypeConfig> for Arc<Store> {
async fn get_log_state(&mut self) -> StorageResult<LogState<TypeConfig>> {
let last = self.db.iterator_cf(self.logs(), rocksdb::IteratorMode::End).next().and_then(|res| {
let (_, ent) = res.unwrap();
Some(serde_json::from_slice::<Entry<TypeConfig>>(&ent).ok()?.log_id)
});

let last_purged_log_id = self.get_last_purged_()?;

let last_log_id = match last {
None => last_purged_log_id,
Some(x) => Some(x),
};
Ok(LogState {
last_purged_log_id,
last_log_id,
})
}

async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug + Send + Sync>(
&mut self,
range: RB,
Expand Down Expand Up @@ -415,6 +397,24 @@ impl RaftStorage<TypeConfig> for Arc<Store> {
type LogReader = Self;
type SnapshotBuilder = Self;

async fn get_log_state(&mut self) -> StorageResult<LogState<TypeConfig>> {
let last = self.db.iterator_cf(self.logs(), rocksdb::IteratorMode::End).next().and_then(|res| {
let (_, ent) = res.unwrap();
Some(serde_json::from_slice::<Entry<TypeConfig>>(&ent).ok()?.log_id)
});

let last_purged_log_id = self.get_last_purged_()?;

let last_log_id = match last {
None => last_purged_log_id,
Some(x) => Some(x),
};
Ok(LogState {
last_purged_log_id,
last_log_id,
})
}

#[tracing::instrument(level = "trace", skip(self))]
async fn save_vote(&mut self, vote: &Vote<NodeId>) -> Result<(), StorageError<NodeId>> {
self.set_vote_(vote)
Expand Down
52 changes: 26 additions & 26 deletions memstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,32 +213,6 @@ impl RaftLogReader<TypeConfig> for Arc<MemStore> {

Ok(entries)
}

async fn get_log_state(&mut self) -> Result<LogState<TypeConfig>, StorageError<MemNodeId>> {
let log = self.log.read().await;
let last_serialized = log.iter().next_back().map(|(_, ent)| ent);

let last = match last_serialized {
None => None,
Some(serialized) => {
let ent: Entry<TypeConfig> =
serde_json::from_str(serialized).map_err(|e| StorageIOError::read_logs(&e))?;
Some(*ent.get_log_id())
}
};

let last_purged = *self.last_purged_log_id.read().await;

let last = match last {
None => last_purged,
Some(x) => Some(x),
};

Ok(LogState {
last_purged_log_id: last_purged,
last_log_id: last,
})
}
}

#[async_trait]
Expand Down Expand Up @@ -309,6 +283,32 @@ impl RaftSnapshotBuilder<TypeConfig> for Arc<MemStore> {

#[async_trait]
impl RaftStorage<TypeConfig> for Arc<MemStore> {
async fn get_log_state(&mut self) -> Result<LogState<TypeConfig>, StorageError<MemNodeId>> {
let log = self.log.read().await;
let last_serialized = log.iter().next_back().map(|(_, ent)| ent);

let last = match last_serialized {
None => None,
Some(serialized) => {
let ent: Entry<TypeConfig> =
serde_json::from_str(serialized).map_err(|e| StorageIOError::read_logs(&e))?;
Some(*ent.get_log_id())
}
};

let last_purged = *self.last_purged_log_id.read().await;

let last = match last {
None => last_purged,
Some(x) => Some(x),
};

Ok(LogState {
last_purged_log_id: last_purged,
last_log_id: last,
})
}

#[tracing::instrument(level = "trace", skip(self))]
async fn save_vote(&mut self, vote: &Vote<MemNodeId>) -> Result<(), StorageError<MemNodeId>> {
tracing::debug!(?vote, "save_vote");
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/compat/compat07.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ pub mod testing {
}

// get_log_state
let got = crate::RaftLogReader::get_log_state(&mut s8).await?;
let got = crate::RaftStorage::get_log_state(&mut s8).await?;
assert_eq!(
crate::LogState {
last_purged_log_id: Some(crate::LogId::new(crate::CommittedLeaderId::new(1, 0), 5)),
Expand Down
9 changes: 5 additions & 4 deletions openraft/src/docs/getting_started/getting-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ Follow the link to method document to see the details.
| Kind | [`RaftStorage`] method | Return value | Description |
|------------|----------------------------------|------------------------------|---------------------------------------|
| Read log: | [`get_log_reader()`] | impl [`RaftLogReader`] | get a read-only log reader |
| | |[`get_log_state()`] | get first/last log id |
| | |[`try_get_log_entries()`] | get a range of logs |
| | [`get_log_state()`] | [`LogState`] | get first/last log id |
| Write log: | [`append_to_log()`] | () | append logs |
| Write log: | [`delete_conflict_logs_since()`] | () | delete logs `[index, +oo)` |
| Write log: | [`purge_logs_upto()`] | () | purge logs `(-oo, index]` |
Expand All @@ -115,16 +115,16 @@ Most of the APIs are quite straightforward, except two indirect APIs:
```

[`RaftLogReader`] defines the APIs to read logs, and is an also super trait of [`RaftStorage`] :
- [`get_log_state()`] get latest log state from the storage;
- [`try_get_log_entries()`] get log entries in a range;

```ignore
trait RaftLogReader<C: RaftTypeConfig> {
async fn get_log_state(&mut self) -> Result<LogState<C>, ...>;
async fn try_get_log_entries<RB: RangeBounds<u64>>(&mut self, range: RB) -> Result<Vec<C::Entry>, ...>;
}
```

And [`RaftStorage::get_log_state()`][`get_log_state()`] get latest log state from the storage;

- Build a snapshot from the local state machine needs to be done in two steps:
- [`RaftStorage::get_snapshot_builder() -> Self::SnapshotBuilder`][`get_snapshot_builder()`],
- [`RaftSnapshotBuilder::build_snapshot() -> Result<Snapshot>`][`build_snapshot()`],
Expand Down Expand Up @@ -357,12 +357,13 @@ Additionally, two test scripts for setting up a cluster are available:
[`Entry`]: `crate::entry::Entry`
[`docs::Vote`]: `crate::docs::data::Vote`
[`Vote`]: `crate::vote::Vote`
[`LogState`]: `crate::storage::LogState`

[`RaftLogReader`]: `crate::storage::RaftLogReader`
[`get_log_state()`]: `crate::storage::RaftLogReader::get_log_state`
[`try_get_log_entries()`]: `crate::storage::RaftLogReader::try_get_log_entries`

[`RaftStorage`]: `crate::storage::RaftStorage`
[`get_log_state()`]: `crate::storage::RaftStorage::get_log_state`
[`RaftStorage::LogReader`]: `crate::storage::RaftStorage::LogReader`
[`RaftStorage::SnapshotBuilder`]: `crate::storage::RaftStorage::SnapshotBuilder`
[`get_log_reader()`]: `crate::storage::RaftStorage::get_log_reader`
Expand Down
8 changes: 4 additions & 4 deletions openraft/src/storage/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,6 @@ where
C: RaftTypeConfig,
S: RaftStorage<C>,
{
async fn get_log_state(&mut self) -> Result<LogState<C>, StorageError<C::NodeId>> {
S::get_log_state(self.storage_mut().await.deref_mut()).await
}

async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug + Send + Sync>(
&mut self,
range: RB,
Expand All @@ -130,6 +126,10 @@ where
{
type LogReader = S::LogReader;

async fn get_log_state(&mut self) -> Result<LogState<C>, StorageError<C::NodeId>> {
S::get_log_state(self.storage_mut().await.deref_mut()).await
}

async fn save_vote(&mut self, vote: &Vote<C::NodeId>) -> Result<(), StorageError<C::NodeId>> {
S::save_vote(self.storage_mut().await.deref_mut(), vote).await
}
Expand Down
18 changes: 9 additions & 9 deletions openraft/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,15 +141,6 @@ pub struct LogState<C: RaftTypeConfig> {
pub trait RaftLogReader<C>: Send + Sync + 'static
where C: RaftTypeConfig
{
/// Returns the last deleted log id and the last log id.
///
/// The impl should not consider the applied log id in state machine.
/// The returned `last_log_id` could be the log id of the last present log entry, or the
/// `last_purged_log_id` if there is no entry at all.
// NOTE: This can be made into sync, provided all state machines will use atomic read or the
// like.
async fn get_log_state(&mut self) -> Result<LogState<C>, StorageError<C::NodeId>>;

/// Get a series of log entries from storage.
///
/// The start value is inclusive in the search and the stop value is non-inclusive: `[start,
Expand Down Expand Up @@ -219,6 +210,15 @@ where C: RaftTypeConfig

// --- Log

/// Returns the last deleted log id and the last log id.
///
/// The impl should **not** consider the applied log id in state machine.
/// The returned `last_log_id` could be the log id of the last present log entry, or the
/// `last_purged_log_id` if there is no entry at all.
// NOTE: This can be made into sync, provided all state machines will use atomic read or the
// like.
async fn get_log_state(&mut self) -> Result<LogState<C>, StorageError<C::NodeId>>;

/// Get the log reader.
///
/// The method is intentionally async to give the implementation a chance to use asynchronous
Expand Down
10 changes: 10 additions & 0 deletions openraft/src/storage/v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use macros::add_async_trait;
use crate::storage::callback::LogFlushed;
use crate::storage::v2::sealed::Sealed;
use crate::LogId;
use crate::LogState;
use crate::OptionalSend;
use crate::RaftLogReader;
use crate::RaftSnapshotBuilder;
Expand Down Expand Up @@ -51,6 +52,15 @@ where C: RaftTypeConfig
/// nodes.
type LogReader: RaftLogReader<C>;

/// Returns the last deleted log id and the last log id.
///
/// The impl should **not** consider the applied log id in state machine.
/// The returned `last_log_id` could be the log id of the last present log entry, or the
/// `last_purged_log_id` if there is no entry at all.
// NOTE: This can be made into sync, provided all state machines will use atomic read or the
// like.
async fn get_log_state(&mut self) -> Result<LogState<C>, StorageError<C::NodeId>>;

/// Get the log reader.
///
/// The method is intentionally async to give the implementation a chance to use asynchronous
Expand Down
Loading

0 comments on commit add2f7b

Please sign in to comment.