Skip to content

Commit

Permalink
Fix respond race condition and add test for it
Browse files Browse the repository at this point in the history
  • Loading branch information
sosthene-nitrokey committed Jun 14, 2024
1 parent 40df056 commit 0a75cda
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 18 deletions.
12 changes: 8 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -767,10 +767,14 @@ impl<'i, Rq, Rp> Responder<'i, Rq, Rp> {
unsafe {
self.with_data_mut(|i| *i = Message::from_rp(response));
}
self.channel
.state
.store(State::Responded as u8, Ordering::Release);
Ok(())
if self
.channel
.transition(State::BuildingResponse, State::Responded)
{
Ok(())
} else {
Err(Error)
}
} else {
Err(Error)
}
Expand Down
41 changes: 27 additions & 14 deletions tests/loom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ use interchange::{Channel, Requester, Responder};
use std::sync::atomic::Ordering::Acquire;
use std::sync::atomic::{AtomicBool, Ordering::Release};

static BRANCHES_USED: [AtomicBool; 11] = {
static BRANCHES_USED: [AtomicBool; 14] = {
#[allow(clippy::declare_interior_mutable_const)]
const ATOMIC_BOOL_INIT: AtomicBool = AtomicBool::new(false);
[ATOMIC_BOOL_INIT; 11]
[ATOMIC_BOOL_INIT; 14]
};

#[cfg(loom)]
Expand Down Expand Up @@ -51,14 +51,24 @@ fn test_function() {

fn requester_thread(mut requester: Requester<'static, u64, u64>) -> Option<()> {
requester.request(53).unwrap();
match requester.cancel() {
Ok(Some(53) | None) => {
BRANCHES_USED[0].store(true, Release);
return None;
}
Ok(_) => panic!("Invalid state"),
Err(_) => {
BRANCHES_USED[1].store(true, Release);
}
}
requester
.with_response(|r| {
BRANCHES_USED[0].store(true, Release);
BRANCHES_USED[2].store(true, Release);
assert_eq!(*r, 63)
})
.ok()
.or_else(|| {
BRANCHES_USED[1].store(true, Release);
BRANCHES_USED[3].store(true, Release);
None
})?;
requester.with_response(|r| assert_eq!(*r, 63)).unwrap();
Expand All @@ -67,44 +77,47 @@ fn requester_thread(mut requester: Requester<'static, u64, u64>) -> Option<()> {
requester.send_request().unwrap();
thread::yield_now();
match requester.cancel() {
Ok(Some(51) | None) => BRANCHES_USED[2].store(true, Release),
Ok(Some(51) | None) => BRANCHES_USED[4].store(true, Release),
Ok(_) => panic!("Invalid state"),
Err(_) => {
BRANCHES_USED[3].store(true, Release);
BRANCHES_USED[5].store(true, Release);
match requester.take_response() {
Some(i) => {
assert_eq!(i, 79);
BRANCHES_USED[4].store(true, Release);
BRANCHES_USED[6].store(true, Release);
}
None => BRANCHES_USED[5].store(true, Release),
None => BRANCHES_USED[7].store(true, Release),
}
}
}
BRANCHES_USED[6].store(true, Release);
BRANCHES_USED[8].store(true, Release);
None
}

fn responder_thread(mut responder: Responder<'static, u64, u64>) -> Option<()> {
let req = responder.take_request().or_else(|| {
BRANCHES_USED[7].store(true, Release);
BRANCHES_USED[9].store(true, Release);
None
})?;
assert_eq!(req, 53);
responder.respond(req + 10).unwrap();
responder.respond(req + 10).ok().or_else(|| {
BRANCHES_USED[10].store(true, Release);
None
})?;
thread::yield_now();
responder
.with_request(|r| {
BRANCHES_USED[8].store(true, Release);
BRANCHES_USED[11].store(true, Release);
assert_eq!(*r, 51)
})
.map(|_| assert!(responder.with_request(|_| {}).is_err()))
.or_else(|_| {
BRANCHES_USED[9].store(true, Release);
BRANCHES_USED[12].store(true, Release);
responder.acknowledge_cancel()
})
.ok()?;
responder.with_response_mut(|r| *r = 79).ok();
responder.send_response().ok();
BRANCHES_USED[10].store(true, Release);
BRANCHES_USED[13].store(true, Release);
None
}

0 comments on commit 0a75cda

Please sign in to comment.