diff --git a/cluster_benchmark/tests/benchmark/store.rs b/cluster_benchmark/tests/benchmark/store.rs index 141a713ac..88e5fc95c 100644 --- a/cluster_benchmark/tests/benchmark/store.rs +++ b/cluster_benchmark/tests/benchmark/store.rs @@ -116,28 +116,6 @@ impl RaftLogReader for Arc { Ok(entries) } - - async fn get_log_state(&mut self) -> Result, StorageError> { - let log = self.log.read().await; - let last_serialized = log.iter().rev().next().map(|(_, ent)| ent); - - let last = match last_serialized { - None => None, - Some(ent) => Some(*ent.get_log_id()), - }; - - let last_purged = self.last_purged_log_id.read().await.clone(); - - let last = match last { - None => last_purged, - Some(x) => Some(x), - }; - - Ok(LogState { - last_purged_log_id: last_purged, - last_log_id: last, - }) - } } #[async_trait] @@ -194,6 +172,28 @@ impl RaftSnapshotBuilder for Arc { #[async_trait] impl RaftLogStorage for Arc { + async fn get_log_state(&mut self) -> Result, StorageError> { + let log = self.log.read().await; + let last_serialized = log.iter().rev().next().map(|(_, ent)| ent); + + let last = match last_serialized { + None => None, + Some(ent) => Some(*ent.get_log_id()), + }; + + let last_purged = self.last_purged_log_id.read().await.clone(); + + let last = match last { + None => last_purged, + Some(x) => Some(x), + }; + + Ok(LogState { + last_purged_log_id: last_purged, + last_log_id: last, + }) + } + #[tracing::instrument(level = "trace", skip(self))] async fn save_vote(&mut self, vote: &Vote) -> Result<(), StorageError> { let mut v = self.vote.write().await; diff --git a/examples/raft-kv-memstore/src/store/mod.rs b/examples/raft-kv-memstore/src/store/mod.rs index ded553967..c30aedd32 100644 --- a/examples/raft-kv-memstore/src/store/mod.rs +++ b/examples/raft-kv-memstore/src/store/mod.rs @@ -96,23 +96,6 @@ pub struct Store { #[async_trait] impl RaftLogReader for Arc { - async fn get_log_state(&mut self) -> Result, StorageError> { - let log = self.log.read().await; - let last = log.iter().next_back().map(|(_, ent)| ent.log_id); - - let last_purged = *self.last_purged_log_id.read().await; - - let last = match last { - None => last_purged, - Some(x) => Some(x), - }; - - Ok(LogState { - last_purged_log_id: last_purged, - last_log_id: last, - }) - } - async fn try_get_log_entries + Clone + Debug + Send + Sync>( &mut self, range: RB, @@ -180,6 +163,23 @@ impl RaftStorage for Arc { type LogReader = Self; type SnapshotBuilder = Self; + async fn get_log_state(&mut self) -> Result, StorageError> { + let log = self.log.read().await; + let last = log.iter().next_back().map(|(_, ent)| ent.log_id); + + let last_purged = *self.last_purged_log_id.read().await; + + let last = match last { + None => last_purged, + Some(x) => Some(x), + }; + + Ok(LogState { + last_purged_log_id: last_purged, + last_log_id: last, + }) + } + #[tracing::instrument(level = "trace", skip(self))] async fn save_vote(&mut self, vote: &Vote) -> Result<(), StorageError> { let mut v = self.vote.write().await; diff --git a/examples/raft-kv-rocksdb/src/store.rs b/examples/raft-kv-rocksdb/src/store.rs index 178ac9969..970ae7441 100644 --- a/examples/raft-kv-rocksdb/src/store.rs +++ b/examples/raft-kv-rocksdb/src/store.rs @@ -318,24 +318,6 @@ impl Store { #[async_trait] impl RaftLogReader for Arc { - async fn get_log_state(&mut self) -> StorageResult> { - let last = self.db.iterator_cf(self.logs(), rocksdb::IteratorMode::End).next().and_then(|res| { - let (_, ent) = res.unwrap(); - Some(serde_json::from_slice::>(&ent).ok()?.log_id) - }); - - let last_purged_log_id = self.get_last_purged_()?; - - let last_log_id = match last { - None => last_purged_log_id, - Some(x) => Some(x), - }; - Ok(LogState { - last_purged_log_id, - last_log_id, - }) - } - async fn try_get_log_entries + Clone + Debug + Send + Sync>( &mut self, range: RB, @@ -415,6 +397,24 @@ impl RaftStorage for Arc { type LogReader = Self; type SnapshotBuilder = Self; + async fn get_log_state(&mut self) -> StorageResult> { + let last = self.db.iterator_cf(self.logs(), rocksdb::IteratorMode::End).next().and_then(|res| { + let (_, ent) = res.unwrap(); + Some(serde_json::from_slice::>(&ent).ok()?.log_id) + }); + + let last_purged_log_id = self.get_last_purged_()?; + + let last_log_id = match last { + None => last_purged_log_id, + Some(x) => Some(x), + }; + Ok(LogState { + last_purged_log_id, + last_log_id, + }) + } + #[tracing::instrument(level = "trace", skip(self))] async fn save_vote(&mut self, vote: &Vote) -> Result<(), StorageError> { self.set_vote_(vote) diff --git a/memstore/src/lib.rs b/memstore/src/lib.rs index 3e3eb7e98..9d7a079d2 100644 --- a/memstore/src/lib.rs +++ b/memstore/src/lib.rs @@ -213,32 +213,6 @@ impl RaftLogReader for Arc { Ok(entries) } - - async fn get_log_state(&mut self) -> Result, StorageError> { - let log = self.log.read().await; - let last_serialized = log.iter().next_back().map(|(_, ent)| ent); - - let last = match last_serialized { - None => None, - Some(serialized) => { - let ent: Entry = - serde_json::from_str(serialized).map_err(|e| StorageIOError::read_logs(&e))?; - Some(*ent.get_log_id()) - } - }; - - let last_purged = *self.last_purged_log_id.read().await; - - let last = match last { - None => last_purged, - Some(x) => Some(x), - }; - - Ok(LogState { - last_purged_log_id: last_purged, - last_log_id: last, - }) - } } #[async_trait] @@ -309,6 +283,32 @@ impl RaftSnapshotBuilder for Arc { #[async_trait] impl RaftStorage for Arc { + async fn get_log_state(&mut self) -> Result, StorageError> { + let log = self.log.read().await; + let last_serialized = log.iter().next_back().map(|(_, ent)| ent); + + let last = match last_serialized { + None => None, + Some(serialized) => { + let ent: Entry = + serde_json::from_str(serialized).map_err(|e| StorageIOError::read_logs(&e))?; + Some(*ent.get_log_id()) + } + }; + + let last_purged = *self.last_purged_log_id.read().await; + + let last = match last { + None => last_purged, + Some(x) => Some(x), + }; + + Ok(LogState { + last_purged_log_id: last_purged, + last_log_id: last, + }) + } + #[tracing::instrument(level = "trace", skip(self))] async fn save_vote(&mut self, vote: &Vote) -> Result<(), StorageError> { tracing::debug!(?vote, "save_vote"); diff --git a/openraft/src/compat/compat07.rs b/openraft/src/compat/compat07.rs index fc19e4fee..922e147c3 100644 --- a/openraft/src/compat/compat07.rs +++ b/openraft/src/compat/compat07.rs @@ -314,7 +314,7 @@ pub mod testing { } // get_log_state - let got = crate::RaftLogReader::get_log_state(&mut s8).await?; + let got = crate::RaftStorage::get_log_state(&mut s8).await?; assert_eq!( crate::LogState { last_purged_log_id: Some(crate::LogId::new(crate::CommittedLeaderId::new(1, 0), 5)), diff --git a/openraft/src/docs/getting_started/getting-started.md b/openraft/src/docs/getting_started/getting-started.md index 2df1aeef6..fa69cf0d6 100644 --- a/openraft/src/docs/getting_started/getting-started.md +++ b/openraft/src/docs/getting_started/getting-started.md @@ -87,8 +87,8 @@ Follow the link to method document to see the details. | Kind | [`RaftStorage`] method | Return value | Description | |------------|----------------------------------|------------------------------|---------------------------------------| | Read log: | [`get_log_reader()`] | impl [`RaftLogReader`] | get a read-only log reader | -| | | ↳ [`get_log_state()`] | get first/last log id | | | | ↳ [`try_get_log_entries()`] | get a range of logs | +| | [`get_log_state()`] | [`LogState`] | get first/last log id | | Write log: | [`append_to_log()`] | () | append logs | | Write log: | [`delete_conflict_logs_since()`] | () | delete logs `[index, +oo)` | | Write log: | [`purge_logs_upto()`] | () | purge logs `(-oo, index]` | @@ -115,16 +115,16 @@ Most of the APIs are quite straightforward, except two indirect APIs: ``` [`RaftLogReader`] defines the APIs to read logs, and is an also super trait of [`RaftStorage`] : - - [`get_log_state()`] get latest log state from the storage; - [`try_get_log_entries()`] get log entries in a range; ```ignore trait RaftLogReader { - async fn get_log_state(&mut self) -> Result, ...>; async fn try_get_log_entries>(&mut self, range: RB) -> Result, ...>; } ``` + And [`RaftStorage::get_log_state()`][`get_log_state()`] get latest log state from the storage; + - Build a snapshot from the local state machine needs to be done in two steps: - [`RaftStorage::get_snapshot_builder() -> Self::SnapshotBuilder`][`get_snapshot_builder()`], - [`RaftSnapshotBuilder::build_snapshot() -> Result`][`build_snapshot()`], @@ -357,12 +357,13 @@ Additionally, two test scripts for setting up a cluster are available: [`Entry`]: `crate::entry::Entry` [`docs::Vote`]: `crate::docs::data::Vote` [`Vote`]: `crate::vote::Vote` +[`LogState`]: `crate::storage::LogState` [`RaftLogReader`]: `crate::storage::RaftLogReader` -[`get_log_state()`]: `crate::storage::RaftLogReader::get_log_state` [`try_get_log_entries()`]: `crate::storage::RaftLogReader::try_get_log_entries` [`RaftStorage`]: `crate::storage::RaftStorage` +[`get_log_state()`]: `crate::storage::RaftStorage::get_log_state` [`RaftStorage::LogReader`]: `crate::storage::RaftStorage::LogReader` [`RaftStorage::SnapshotBuilder`]: `crate::storage::RaftStorage::SnapshotBuilder` [`get_log_reader()`]: `crate::storage::RaftStorage::get_log_reader` diff --git a/openraft/src/storage/adapter.rs b/openraft/src/storage/adapter.rs index 4f25eba4d..9a72e0426 100644 --- a/openraft/src/storage/adapter.rs +++ b/openraft/src/storage/adapter.rs @@ -103,10 +103,6 @@ where C: RaftTypeConfig, S: RaftStorage, { - async fn get_log_state(&mut self) -> Result, StorageError> { - S::get_log_state(self.storage_mut().await.deref_mut()).await - } - async fn try_get_log_entries + Clone + Debug + Send + Sync>( &mut self, range: RB, @@ -130,6 +126,10 @@ where { type LogReader = S::LogReader; + async fn get_log_state(&mut self) -> Result, StorageError> { + S::get_log_state(self.storage_mut().await.deref_mut()).await + } + async fn save_vote(&mut self, vote: &Vote) -> Result<(), StorageError> { S::save_vote(self.storage_mut().await.deref_mut(), vote).await } diff --git a/openraft/src/storage/mod.rs b/openraft/src/storage/mod.rs index b28405187..a138b6eb6 100644 --- a/openraft/src/storage/mod.rs +++ b/openraft/src/storage/mod.rs @@ -141,15 +141,6 @@ pub struct LogState { pub trait RaftLogReader: Send + Sync + 'static where C: RaftTypeConfig { - /// Returns the last deleted log id and the last log id. - /// - /// The impl should not consider the applied log id in state machine. - /// The returned `last_log_id` could be the log id of the last present log entry, or the - /// `last_purged_log_id` if there is no entry at all. - // NOTE: This can be made into sync, provided all state machines will use atomic read or the - // like. - async fn get_log_state(&mut self) -> Result, StorageError>; - /// Get a series of log entries from storage. /// /// The start value is inclusive in the search and the stop value is non-inclusive: `[start, @@ -219,6 +210,15 @@ where C: RaftTypeConfig // --- Log + /// Returns the last deleted log id and the last log id. + /// + /// The impl should **not** consider the applied log id in state machine. + /// The returned `last_log_id` could be the log id of the last present log entry, or the + /// `last_purged_log_id` if there is no entry at all. + // NOTE: This can be made into sync, provided all state machines will use atomic read or the + // like. + async fn get_log_state(&mut self) -> Result, StorageError>; + /// Get the log reader. /// /// The method is intentionally async to give the implementation a chance to use asynchronous diff --git a/openraft/src/storage/v2.rs b/openraft/src/storage/v2.rs index a2a85e39b..4087ec80c 100644 --- a/openraft/src/storage/v2.rs +++ b/openraft/src/storage/v2.rs @@ -7,6 +7,7 @@ use macros::add_async_trait; use crate::storage::callback::LogFlushed; use crate::storage::v2::sealed::Sealed; use crate::LogId; +use crate::LogState; use crate::OptionalSend; use crate::RaftLogReader; use crate::RaftSnapshotBuilder; @@ -51,6 +52,15 @@ where C: RaftTypeConfig /// nodes. type LogReader: RaftLogReader; + /// Returns the last deleted log id and the last log id. + /// + /// The impl should **not** consider the applied log id in state machine. + /// The returned `last_log_id` could be the log id of the last present log entry, or the + /// `last_purged_log_id` if there is no entry at all. + // NOTE: This can be made into sync, provided all state machines will use atomic read or the + // like. + async fn get_log_state(&mut self) -> Result, StorageError>; + /// Get the log reader. /// /// The method is intentionally async to give the implementation a chance to use asynchronous diff --git a/rocksstore-compat07/src/lib.rs b/rocksstore-compat07/src/lib.rs index e1826ef36..90b497d15 100644 --- a/rocksstore-compat07/src/lib.rs +++ b/rocksstore-compat07/src/lib.rs @@ -370,40 +370,6 @@ impl RocksStore { #[async_trait] impl RaftLogReader for Arc { - async fn get_log_state(&mut self) -> StorageResult> { - let last = self.db.iterator_cf(self.cf_logs(), rocksdb::IteratorMode::End).next(); - - let last_log_id = match last { - None => None, - Some(res) => { - let (_log_index, entry_bytes) = res.map_err(read_logs_err)?; - - let ent = serde_json::from_slice::>(&entry_bytes).map_err(read_logs_err)?; - let ent = ent.upgrade(); - Some(ent.log_id) - } - }; - - let last_purged_log_id = self.get_meta_vec::()?; - let last_purged_log_id = match last_purged_log_id { - None => None, - Some(bs) => { - let log_id = serde_json::from_slice::(&bs).map_err(read_logs_err)?; - Some(log_id.upgrade()) - } - }; - - let last_log_id = match last_log_id { - None => last_purged_log_id, - Some(x) => Some(x), - }; - - Ok(LogState { - last_purged_log_id, - last_log_id, - }) - } - async fn try_get_log_entries + Clone + Debug + Send + Sync>( &mut self, range: RB, @@ -488,6 +454,40 @@ impl RaftStorage for Arc { type LogReader = Self; type SnapshotBuilder = Self; + async fn get_log_state(&mut self) -> StorageResult> { + let last = self.db.iterator_cf(self.cf_logs(), rocksdb::IteratorMode::End).next(); + + let last_log_id = match last { + None => None, + Some(res) => { + let (_log_index, entry_bytes) = res.map_err(read_logs_err)?; + + let ent = serde_json::from_slice::>(&entry_bytes).map_err(read_logs_err)?; + let ent = ent.upgrade(); + Some(ent.log_id) + } + }; + + let last_purged_log_id = self.get_meta_vec::()?; + let last_purged_log_id = match last_purged_log_id { + None => None, + Some(bs) => { + let log_id = serde_json::from_slice::(&bs).map_err(read_logs_err)?; + Some(log_id.upgrade()) + } + }; + + let last_log_id = match last_log_id { + None => last_purged_log_id, + Some(x) => Some(x), + }; + + Ok(LogState { + last_purged_log_id, + last_log_id, + }) + } + #[tracing::instrument(level = "trace", skip(self))] async fn save_vote(&mut self, vote: &Vote) -> Result<(), StorageError> { self.put_meta::(vote) diff --git a/rocksstore/src/lib.rs b/rocksstore/src/lib.rs index 94b06f79b..7cd8a25a5 100644 --- a/rocksstore/src/lib.rs +++ b/rocksstore/src/lib.rs @@ -340,31 +340,6 @@ impl RocksStore { #[async_trait] impl RaftLogReader for Arc { - async fn get_log_state(&mut self) -> StorageResult> { - let last = self.db.iterator_cf(self.cf_logs(), rocksdb::IteratorMode::End).next(); - - let last_log_id = match last { - None => None, - Some(res) => { - let (_log_index, entry_bytes) = res.map_err(read_logs_err)?; - let ent = serde_json::from_slice::>(&entry_bytes).map_err(read_logs_err)?; - Some(ent.log_id) - } - }; - - let last_purged_log_id = self.get_meta::()?; - - let last_log_id = match last_log_id { - None => last_purged_log_id, - Some(x) => Some(x), - }; - - Ok(LogState { - last_purged_log_id, - last_log_id, - }) - } - async fn try_get_log_entries + Clone + Debug + Send + Sync>( &mut self, range: RB, @@ -448,6 +423,31 @@ impl RaftStorage for Arc { type LogReader = Self; type SnapshotBuilder = Self; + async fn get_log_state(&mut self) -> StorageResult> { + let last = self.db.iterator_cf(self.cf_logs(), rocksdb::IteratorMode::End).next(); + + let last_log_id = match last { + None => None, + Some(res) => { + let (_log_index, entry_bytes) = res.map_err(read_logs_err)?; + let ent = serde_json::from_slice::>(&entry_bytes).map_err(read_logs_err)?; + Some(ent.log_id) + } + }; + + let last_purged_log_id = self.get_meta::()?; + + let last_log_id = match last_log_id { + None => last_purged_log_id, + Some(x) => Some(x), + }; + + Ok(LogState { + last_purged_log_id, + last_log_id, + }) + } + #[tracing::instrument(level = "trace", skip(self))] async fn save_vote(&mut self, vote: &Vote) -> Result<(), StorageError> { self.put_meta::(vote) diff --git a/sledstore/src/lib.rs b/sledstore/src/lib.rs index 12b76d217..757cd176e 100644 --- a/sledstore/src/lib.rs +++ b/sledstore/src/lib.rs @@ -386,30 +386,6 @@ impl SledStore { #[async_trait] impl RaftLogReader for Arc { - async fn get_log_state(&mut self) -> StorageResult> { - let last_purged = self.get_last_purged_()?; - - let logs_tree = logs(&self.db); - let last_ivec_kv = logs_tree.last().map_err(read_logs_err)?; - let (_, ent_ivec) = if let Some(last) = last_ivec_kv { - last - } else { - return Ok(LogState { - last_purged_log_id: last_purged, - last_log_id: last_purged, - }); - }; - - let last_ent = serde_json::from_slice::>(&ent_ivec).map_err(read_logs_err)?; - let last_log_id = Some(*last_ent.get_log_id()); - - let last_log_id = std::cmp::max(last_log_id, last_purged); - Ok(LogState { - last_purged_log_id: last_purged, - last_log_id, - }) - } - async fn try_get_log_entries + Clone + Debug + Send + Sync>( &mut self, range: RB, @@ -494,6 +470,30 @@ impl RaftStorage for Arc { type LogReader = Self; type SnapshotBuilder = Self; + async fn get_log_state(&mut self) -> StorageResult> { + let last_purged = self.get_last_purged_()?; + + let logs_tree = logs(&self.db); + let last_ivec_kv = logs_tree.last().map_err(read_logs_err)?; + let (_, ent_ivec) = if let Some(last) = last_ivec_kv { + last + } else { + return Ok(LogState { + last_purged_log_id: last_purged, + last_log_id: last_purged, + }); + }; + + let last_ent = serde_json::from_slice::>(&ent_ivec).map_err(read_logs_err)?; + let last_log_id = Some(*last_ent.get_log_id()); + + let last_log_id = std::cmp::max(last_log_id, last_purged); + Ok(LogState { + last_purged_log_id: last_purged, + last_log_id, + }) + } + #[tracing::instrument(level = "trace", skip(self))] async fn save_vote(&mut self, vote: &Vote) -> Result<(), StorageError> { self.set_vote_(vote).await diff --git a/stores/rocksstore-v2/src/lib.rs b/stores/rocksstore-v2/src/lib.rs index ea5f45f5b..4f85945ee 100644 --- a/stores/rocksstore-v2/src/lib.rs +++ b/stores/rocksstore-v2/src/lib.rs @@ -233,31 +233,6 @@ impl RocksLogStore { #[async_trait] impl RaftLogReader for RocksLogStore { - async fn get_log_state(&mut self) -> StorageResult> { - let last = self.db.iterator_cf(self.cf_logs(), rocksdb::IteratorMode::End).next(); - - let last_log_id = match last { - None => None, - Some(res) => { - let (_log_index, entry_bytes) = res.map_err(read_logs_err)?; - let ent = serde_json::from_slice::>(&entry_bytes).map_err(read_logs_err)?; - Some(ent.log_id) - } - }; - - let last_purged_log_id = self.get_meta::()?; - - let last_log_id = match last_log_id { - None => last_purged_log_id, - Some(x) => Some(x), - }; - - Ok(LogState { - last_purged_log_id, - last_log_id, - }) - } - async fn try_get_log_entries + Clone + Debug + Send + Sync>( &mut self, range: RB, @@ -339,6 +314,31 @@ impl RaftSnapshotBuilder for RocksStateMachine { impl RaftLogStorage for RocksLogStore { type LogReader = Self; + async fn get_log_state(&mut self) -> StorageResult> { + let last = self.db.iterator_cf(self.cf_logs(), rocksdb::IteratorMode::End).next(); + + let last_log_id = match last { + None => None, + Some(res) => { + let (_log_index, entry_bytes) = res.map_err(read_logs_err)?; + let ent = serde_json::from_slice::>(&entry_bytes).map_err(read_logs_err)?; + Some(ent.log_id) + } + }; + + let last_purged_log_id = self.get_meta::()?; + + let last_log_id = match last_log_id { + None => last_purged_log_id, + Some(x) => Some(x), + }; + + Ok(LogState { + last_purged_log_id, + last_log_id, + }) + } + async fn save_vote(&mut self, vote: &Vote) -> Result<(), StorageError> { self.put_meta::(vote)?; self.db.flush_wal(true).map_err(|e| StorageIOError::write_vote(&e))?; diff --git a/tests/tests/fixtures/mod.rs b/tests/tests/fixtures/mod.rs index 98daf5bb9..b54b6452b 100644 --- a/tests/tests/fixtures/mod.rs +++ b/tests/tests/fixtures/mod.rs @@ -48,7 +48,6 @@ use openraft::LogIdOptionExt; use openraft::MessageSummary; use openraft::Raft; use openraft::RaftLogId; -use openraft::RaftLogReader; use openraft::RaftMetrics; use openraft::RaftState; use openraft::ServerState; diff --git a/tests/tests/metrics/t10_purged.rs b/tests/tests/metrics/t10_purged.rs index aaf4740e8..defa83340 100644 --- a/tests/tests/metrics/t10_purged.rs +++ b/tests/tests/metrics/t10_purged.rs @@ -3,9 +3,9 @@ use std::time::Duration; use anyhow::Result; use maplit::btreeset; +use openraft::storage::RaftLogStorage; use openraft::testing::log_id; use openraft::Config; -use openraft::RaftLogReader; use crate::fixtures::init_default_ut_tracing; use crate::fixtures::RaftRouter; diff --git a/tests/tests/snapshot_streaming/t33_snapshot_delete_conflict_logs.rs b/tests/tests/snapshot_streaming/t33_snapshot_delete_conflict_logs.rs index 255ee4007..77aa5d9bb 100644 --- a/tests/tests/snapshot_streaming/t33_snapshot_delete_conflict_logs.rs +++ b/tests/tests/snapshot_streaming/t33_snapshot_delete_conflict_logs.rs @@ -21,7 +21,6 @@ use openraft::Entry; use openraft::EntryPayload; use openraft::LogId; use openraft::Membership; -use openraft::RaftLogReader; use openraft::RaftSnapshotBuilder; use openraft::ServerState; use openraft::SnapshotPolicy;