Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: read state barrier #503

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion curp/src/server/cmd_worker/conflict_checked_mpmc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ impl<C: Command, CE: CommandExecutor<C>> Filter<C, CE> {
}
}
CEEvent::Reset(snapshot, finish_tx) => {
// since a reset is needed, all other vertexes doesn't matter anymore, so delete them all
// since a reset is needed, all other vertices doesn't matter anymore, so delete them all
self.entry_vid.clear();
self.vs.clear();

Expand Down
2 changes: 1 addition & 1 deletion curp/src/server/curp_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ impl<C: 'static + Command, RC: RoleChange + 'static> CurpNode<C, RC> {
) -> Result<FetchReadStateResponse, CurpError> {
self.check_cluster_version(req.cluster_version)?;
let cmd = req.cmd()?;
let state = self.curp.handle_fetch_read_state(&cmd);
let state = self.curp.handle_fetch_read_state(&cmd)?;
Ok(FetchReadStateResponse::new(state))
}
}
Expand Down
24 changes: 18 additions & 6 deletions curp/src/server/raw_curp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -720,17 +720,29 @@ impl<C: 'static + Command, RC: RoleChange + 'static> RawCurp<C, RC> {
}

/// Handle `fetch_read_state`
pub(super) fn handle_fetch_read_state(&self, cmd: &C) -> ReadState {
let ids = self.ctx.sp.map_lock(|sp| {
sp.pool
pub(super) fn handle_fetch_read_state(&self, cmd: &C) -> Result<ReadState, CurpError> {
if self.st.read().role != Role::Leader {
return Err(CurpError::Internal("not leader".to_owned()));
}

let ids = {
let sp_l = self.ctx.sp.lock();
let ucp_l = self.ctx.ucp.lock();
sp_l.pool
.iter()
.filter_map(|(id, c)| c.is_conflict_with_cmd(cmd).then_some(*id))
.chain(
ucp_l
.iter()
.filter_map(|(id, c)| c.is_conflict_with_cmd(cmd).then_some(*id)),
)
.unique()
.collect_vec()
});
};
if ids.is_empty() {
ReadState::CommitIndex(self.log.read().commit_index)
Ok(ReadState::CommitIndex(self.log.read().commit_index))
} else {
ReadState::Ids(IdSet::new(ids))
Ok(ReadState::Ids(IdSet::new(ids)))
}
}
}
Expand Down
67 changes: 47 additions & 20 deletions xline/src/server/barriers.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::collections::{BTreeMap, HashMap};
use std::{
cmp::Reverse,
collections::{BinaryHeap, HashMap},
};

use clippy_utilities::OverflowArithmetic;
use curp::cmd::ProposeId;
Expand All @@ -17,8 +20,9 @@ impl IndexBarrier {
pub(crate) fn new() -> Self {
IndexBarrier {
inner: Mutex::new(IndexBarrierInner {
last_trigger_index: 0,
barriers: BTreeMap::new(),
next: 1,
indices: BinaryHeap::new(),
barriers: HashMap::new(),
}),
}
}
Expand All @@ -27,7 +31,7 @@ impl IndexBarrier {
pub(crate) async fn wait(&self, index: u64) {
let listener = {
let mut inner_l = self.inner.lock();
if inner_l.last_trigger_index >= index {
if inner_l.next > index {
return;
}
inner_l
Expand All @@ -42,24 +46,31 @@ impl IndexBarrier {
/// Trigger all barriers whose index is less than or equal to the given index.
pub(crate) fn trigger(&self, index: u64) {
let mut inner_l = self.inner.lock();
if inner_l.last_trigger_index < index {
inner_l.last_trigger_index = index;
}
let mut split_barriers = inner_l.barriers.split_off(&(index.overflow_add(1)));
std::mem::swap(&mut inner_l.barriers, &mut split_barriers);
for (_, barrier) in split_barriers {
barrier.notify(usize::MAX);
inner_l.indices.push(Reverse(index));
while inner_l
.indices
.peek()
.map_or(false, |i| i.0.eq(&inner_l.next))
{
let next = inner_l.next;
let _ignore = inner_l.indices.pop();
if let Some(event) = inner_l.barriers.remove(&next) {
event.notify(usize::MAX);
}
inner_l.next = next.overflow_add(1);
}
}
}

/// Inner of index barrier.
#[derive(Debug)]
struct IndexBarrierInner {
/// The last index that the barrier has triggered.
last_trigger_index: u64,
/// Barrier of index.
barriers: BTreeMap<u64, Event>,
/// The next index that haven't been triggered
next: u64,
/// Store all indices that larger than `next`
indices: BinaryHeap<Reverse<u64>>,
/// Events
barriers: HashMap<u64, Event>,
}

/// Barrier for id
Expand Down Expand Up @@ -131,12 +142,28 @@ mod test {
#[abort_on_panic]
async fn test_index_barrier() {
let index_barrier = Arc::new(IndexBarrier::new());
let barriers = (0..5).map(|i| {
let id_barrier = Arc::clone(&index_barrier);
tokio::spawn(async move {
id_barrier.wait(i).await;
let (done_tx, done_rx) = flume::bounded(5);
let barriers = (1..=5)
.map(|i| {
let index_barrier = Arc::clone(&index_barrier);
let done_tx_c = done_tx.clone();
tokio::spawn(async move {
index_barrier.wait(i).await;
done_tx_c.send(i).unwrap();
})
})
});
.collect::<Vec<_>>();

index_barrier.trigger(2);
index_barrier.trigger(3);
sleep(Duration::from_millis(100)).await;
assert!(done_rx.try_recv().is_err());
index_barrier.trigger(1);
sleep(Duration::from_millis(100)).await;
assert_eq!(done_rx.try_recv().unwrap(), 1);
assert_eq!(done_rx.try_recv().unwrap(), 2);
assert_eq!(done_rx.try_recv().unwrap(), 3);
index_barrier.trigger(4);
index_barrier.trigger(5);

timeout(Duration::from_millis(100), index_barrier.wait(3))
Expand Down
Loading