Skip to content

Commit

Permalink
use AtomicBool instead of RwLock
Browse files Browse the repository at this point in the history
  • Loading branch information
KnowWhoami committed Nov 8, 2024
1 parent 47fb73d commit 06ca10b
Show file tree
Hide file tree
Showing 14 changed files with 85 additions and 74 deletions.
19 changes: 3 additions & 16 deletions src/maker/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ pub struct Maker {
/// Highest Value Fidelity Proof
pub highest_fidelity_proof: RwLock<Option<FidelityProof>>,
/// Is setup complete
pub is_setup_complete: RwLock<bool>,
pub is_setup_complete: AtomicBool,
}

#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -217,23 +217,10 @@ impl Maker {
shutdown: AtomicBool::new(false),
connection_state: Mutex::new(HashMap::new()),
highest_fidelity_proof: RwLock::new(None),
is_setup_complete: RwLock::new(false),
is_setup_complete: AtomicBool::new(false),
})
}

/// Triggers a shutdown event for the Maker.
pub fn shutdown(&self) -> Result<(), MakerError> {
self.shutdown.store(true, Relaxed);
Ok(())
}

/// Triggers a setup complete event for the Maker.
pub fn setup_complete(&self) -> Result<(), MakerError> {
let mut flag = self.is_setup_complete.write()?;
*flag = true;
Ok(())
}

/// Returns a reference to the Maker's wallet.
pub fn get_wallet(&self) -> &RwLock<Wallet> {
&self.wallet
Expand Down Expand Up @@ -727,7 +714,7 @@ pub fn recover_from_swap(
log::info!("Completed Wallet Sync.");
// For test, shutdown the maker at this stage.
#[cfg(feature = "integration-test")]
maker.shutdown()?;
maker.shutdown.store(true, Relaxed);
return Ok(());
}
// Sleep before next blockchain scan
Expand Down
18 changes: 10 additions & 8 deletions src/maker/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ use std::{
io::{ErrorKind, Read, Write},
net::{Ipv4Addr, SocketAddr, TcpListener, TcpStream},
path::{Path, PathBuf},
sync::{atomic::Ordering::Relaxed, Arc, Mutex},
sync::{
atomic::{AtomicBool, Ordering::Relaxed},
Arc,
},
thread::{self, sleep},
time::Duration,
};
Expand Down Expand Up @@ -286,7 +289,7 @@ fn setup_fidelity_bond(maker: &Arc<Maker>, maker_address: &str) -> Result<(), Ma
/// This will not block. Once Core RPC connection is live, accepting_client will set as `true` again.
fn check_connection_with_core(
maker: Arc<Maker>,
accepting_clients: Arc<Mutex<bool>>,
accepting_clients: Arc<AtomicBool>,
) -> Result<(), MakerError> {
let mut rpc_ping_success = false;
while !maker.shutdown.load(Relaxed) {
Expand All @@ -310,8 +313,7 @@ fn check_connection_with_core(
} else {
rpc_ping_success = true;
}
let mut mutex = accepting_clients.lock()?;
*mutex = rpc_ping_success;
accepting_clients.store(rpc_ping_success, Relaxed);
}

Ok(())
Expand Down Expand Up @@ -366,7 +368,7 @@ fn handle_client(
// Shutdown server if special behavior is set
MakerError::SpecialBehaviour(sp) => {
log::error!("[{}] Maker Special Behavior : {:?}", maker.config.port, sp);
maker.shutdown()?;
maker.shutdown.store(true, Relaxed);
}
e => {
log::error!(
Expand Down Expand Up @@ -417,7 +419,7 @@ pub fn start_maker_server(maker: Arc<Maker>) -> Result<(), MakerError> {
maker.wallet.write()?.refresh_offer_maxsize_cache()?;

// Global server Mutex, to switch on/off p2p network.
let accepting_clients = Arc::new(Mutex::new(false));
let accepting_clients = Arc::new(AtomicBool::new(false));

// Spawn Server threads.
// All thread handles are stored in the thread_pool, which are all joined at server shutdown.
Expand Down Expand Up @@ -478,7 +480,7 @@ pub fn start_maker_server(maker: Arc<Maker>) -> Result<(), MakerError> {
thread_pool.push(rpc_thread);

sleep(Duration::from_secs(heart_beat_interval)); // wait for 1 beat, to complete spawns of all the threads.
maker.setup_complete()?;
maker.is_setup_complete.store(true, Relaxed);
log::info!("[{}] Maker setup is ready", maker.config.port);
}

Expand All @@ -489,7 +491,7 @@ pub fn start_maker_server(maker: Arc<Maker>) -> Result<(), MakerError> {
let maker = maker.clone(); // This clone is needed to avoid moving the Arc<Maker> in each iterations.

// Block client connections if accepting_client=false
if !*accepting_clients.lock()? {
if !accepting_clients.load(Relaxed) {
log::debug!(
"[{}] Temporary failure in backend node. Not accepting swap request. Check your node if this error persists",
maker.config.port
Expand Down
5 changes: 0 additions & 5 deletions src/market/directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,6 @@ impl DirectoryServer {
addresses,
})
}

pub fn shutdown(&self) -> Result<(), DirectoryServerError> {
self.shutdown.store(true, Relaxed);
Ok(())
}
}

fn write_default_directory_config(config_path: &PathBuf) -> Result<(), DirectoryServerError> {
Expand Down
8 changes: 3 additions & 5 deletions tests/abort1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@ use coinswap::{
taker::{SwapParams, TakerBehavior},
utill::ConnectionType,
};

mod test_framework;
use log::{info, warn};
use std::{assert_eq, thread, time::Duration};
use std::{assert_eq, sync::atomic::Ordering::Relaxed, thread, time::Duration};
use test_framework::*;

/// Abort 1: TAKER Drops After Full Setup.
Expand Down Expand Up @@ -122,7 +121,7 @@ fn test_stop_taker_after_setup() {

// Makers take time to fully setup.
makers.iter().for_each(|maker| {
while !*maker.is_setup_complete.read().unwrap() {
while !maker.is_setup_complete.load(Relaxed) {
log::info!("Waiting for maker setup completion");
// Introduce a delay of 10 seconds to prevent write lock starvation.
thread::sleep(Duration::from_secs(10));
Expand Down Expand Up @@ -195,14 +194,13 @@ fn test_stop_taker_after_setup() {
taker_thread.join().unwrap();

// Wait for Maker threads to conclude.
//makers.iter().for_each(|maker| maker.shutdown().unwrap());
maker_threads
.into_iter()
.for_each(|thread| thread.join().unwrap());

// ---- After Swap checks ----

let _ = directory_server_instance.shutdown();
directory_server_instance.shutdown.store(true, Relaxed);

thread::sleep(Duration::from_secs(10));

Expand Down
10 changes: 6 additions & 4 deletions tests/abort2_case1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ mod test_framework;
use test_framework::*;

use log::{info, warn};
use std::{thread, time::Duration};
use std::{sync::atomic::Ordering::Relaxed, thread, time::Duration};

/// ABORT 2: Maker Drops Before Setup
/// This test demonstrates the situation where a Maker prematurely drops connections after doing
Expand Down Expand Up @@ -96,7 +96,7 @@ fn test_abort_case_2_move_on_with_other_makers() {

// Makers take time to fully setup.
makers.iter().for_each(|maker| {
while !*maker.is_setup_complete.read().unwrap() {
while !maker.is_setup_complete.load(Relaxed) {
log::info!("Waiting for maker setup completion");
// Introduce a delay of 10 seconds to prevent write lock starvation.
thread::sleep(Duration::from_secs(10));
Expand Down Expand Up @@ -127,14 +127,16 @@ fn test_abort_case_2_move_on_with_other_makers() {
taker_thread.join().unwrap();

// Wait for Maker threads to conclude.
makers.iter().for_each(|maker| maker.shutdown().unwrap());
makers
.iter()
.for_each(|maker| maker.shutdown.store(true, Relaxed));
maker_threads
.into_iter()
.for_each(|thread| thread.join().unwrap());

// ---- After Swap checks ----

let _ = directory_server_instance.shutdown();
directory_server_instance.shutdown.store(true, Relaxed);

thread::sleep(Duration::from_secs(10));

Expand Down
13 changes: 8 additions & 5 deletions tests/abort2_case2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ use coinswap::{
taker::SwapParams,
utill::ConnectionType,
};

mod test_framework;
use test_framework::*;

use log::{info, warn};
use std::{fs::File, io::Read, path::PathBuf, thread, time::Duration};
use std::{
fs::File, io::Read, path::PathBuf, sync::atomic::Ordering::Relaxed, thread, time::Duration,
};

/// ABORT 2: Maker Drops Before Setup
/// This test demonstrates the situation where a Maker prematurely drops connections after doing
Expand Down Expand Up @@ -114,7 +115,7 @@ fn test_abort_case_2_recover_if_no_makers_found() {

// Makers take time to fully setup.
makers.iter().for_each(|maker| {
while !*maker.is_setup_complete.read().unwrap() {
while !maker.is_setup_complete.load(Relaxed) {
log::info!("Waiting for maker setup completion");
// Introduce a delay of 10 seconds to prevent write lock starvation.
thread::sleep(Duration::from_secs(10));
Expand Down Expand Up @@ -192,14 +193,16 @@ fn test_abort_case_2_recover_if_no_makers_found() {
}

// Wait for Maker threads to conclude.
makers.iter().for_each(|maker| maker.shutdown().unwrap());
makers
.iter()
.for_each(|maker| maker.shutdown.store(true, Relaxed));
maker_threads
.into_iter()
.for_each(|thread| thread.join().unwrap());

// ---- After Swap checks ----

let _ = directory_server_instance.shutdown();
directory_server_instance.shutdown.store(true, Relaxed);

thread::sleep(Duration::from_secs(10));

Expand Down
12 changes: 8 additions & 4 deletions tests/abort2_case3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ mod test_framework;
use test_framework::*;

use log::{info, warn};
use std::{fs::File, io::Read, path::PathBuf, thread, time::Duration};
use std::{
fs::File, io::Read, path::PathBuf, sync::atomic::Ordering::Relaxed, thread, time::Duration,
};

/// ABORT 2: Maker Drops Before Setup
/// This test demonstrates the situation where a Maker prematurely drops connections after doing
Expand Down Expand Up @@ -95,7 +97,7 @@ fn maker_drops_after_sending_senders_sigs() {

// Makers take time to fully setup.
makers.iter().for_each(|maker| {
while !*maker.is_setup_complete.read().unwrap() {
while !maker.is_setup_complete.load(Relaxed) {
log::info!("Waiting for maker setup completion");
// Introduce a delay of 10 seconds to prevent write lock starvation.
thread::sleep(Duration::from_secs(10));
Expand Down Expand Up @@ -126,14 +128,16 @@ fn maker_drops_after_sending_senders_sigs() {
taker_thread.join().unwrap();

// Wait for Maker threads to conclude.
makers.iter().for_each(|maker| maker.shutdown().unwrap());
makers
.iter()
.for_each(|maker| maker.shutdown.store(true, Relaxed));
maker_threads
.into_iter()
.for_each(|thread| thread.join().unwrap());

// ---- After Swap checks ----

let _ = directory_server_instance.shutdown();
directory_server_instance.shutdown.store(true, Relaxed);

thread::sleep(Duration::from_secs(10));

Expand Down
12 changes: 8 additions & 4 deletions tests/abort3_case1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ mod test_framework;
use test_framework::*;

use log::{info, warn};
use std::{fs::File, io::Read, path::PathBuf, thread, time::Duration};
use std::{
fs::File, io::Read, path::PathBuf, sync::atomic::Ordering::Relaxed, thread, time::Duration,
};

/// ABORT 3: Maker Drops After Setup
/// Case 1: CloseAtContractSigsForRecvrAndSender
Expand Down Expand Up @@ -97,7 +99,7 @@ fn abort3_case1_close_at_contract_sigs_for_recvr_and_sender() {

// Makers take time to fully setup.
makers.iter().for_each(|maker| {
while !*maker.is_setup_complete.read().unwrap() {
while !maker.is_setup_complete.load(Relaxed) {
log::info!("Waiting for maker setup completion");
// Introduce a delay of 10 seconds to prevent write lock starvation.
thread::sleep(Duration::from_secs(10));
Expand Down Expand Up @@ -127,14 +129,16 @@ fn abort3_case1_close_at_contract_sigs_for_recvr_and_sender() {
taker_thread.join().unwrap();

// Wait for Maker threads to conclude.
makers.iter().for_each(|maker| maker.shutdown().unwrap());
makers
.iter()
.for_each(|maker| maker.shutdown.store(true, Relaxed));
maker_threads
.into_iter()
.for_each(|thread| thread.join().unwrap());

// ---- After Swap checks ----

let _ = directory_server_instance.shutdown();
directory_server_instance.shutdown.store(true, Relaxed);

thread::sleep(Duration::from_secs(10));

Expand Down
12 changes: 8 additions & 4 deletions tests/abort3_case2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ mod test_framework;
use test_framework::*;

use log::{info, warn};
use std::{fs::File, io::Read, path::PathBuf, thread, time::Duration};
use std::{
fs::File, io::Read, path::PathBuf, sync::atomic::Ordering::Relaxed, thread, time::Duration,
};

/// ABORT 3: Maker Drops After Setup
/// Case 2: CloseAtContractSigsForRecvr
Expand Down Expand Up @@ -94,7 +96,7 @@ fn abort3_case2_close_at_contract_sigs_for_recvr() {

// Makers take time to fully setup.
makers.iter().for_each(|maker| {
while !*maker.is_setup_complete.read().unwrap() {
while !maker.is_setup_complete.load(Relaxed) {
log::info!("Waiting for maker setup completion");
// Introduce a delay of 10 seconds to prevent write lock starvation.
thread::sleep(Duration::from_secs(10));
Expand Down Expand Up @@ -124,14 +126,16 @@ fn abort3_case2_close_at_contract_sigs_for_recvr() {
taker_thread.join().unwrap();

// Wait for Maker threads to conclude.
makers.iter().for_each(|maker| maker.shutdown().unwrap());
makers
.iter()
.for_each(|maker| maker.shutdown.store(true, Relaxed));
maker_threads
.into_iter()
.for_each(|thread| thread.join().unwrap());

// ---- After Swap checks ----

let _ = directory_server_instance.shutdown();
directory_server_instance.shutdown.store(true, Relaxed);

thread::sleep(Duration::from_secs(10));

Expand Down
12 changes: 8 additions & 4 deletions tests/abort3_case3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ mod test_framework;
use test_framework::*;

use log::{info, warn};
use std::{fs::File, io::Read, path::PathBuf, thread, time::Duration};
use std::{
fs::File, io::Read, path::PathBuf, sync::atomic::Ordering::Relaxed, thread, time::Duration,
};

/// ABORT 3: Maker Drops After Setup
/// Case 3: CloseAtHashPreimage
Expand Down Expand Up @@ -94,7 +96,7 @@ fn abort3_case3_close_at_hash_preimage_handover() {

// Makers take time to fully setup.
makers.iter().for_each(|maker| {
while !*maker.is_setup_complete.read().unwrap() {
while !maker.is_setup_complete.load(Relaxed) {
log::info!("Waiting for maker setup completion");
// Introduce a delay of 10 seconds to prevent write lock starvation.
thread::sleep(Duration::from_secs(10));
Expand Down Expand Up @@ -124,14 +126,16 @@ fn abort3_case3_close_at_hash_preimage_handover() {
taker_thread.join().unwrap();

// Wait for Maker threads to conclude.
makers.iter().for_each(|maker| maker.shutdown().unwrap());
makers
.iter()
.for_each(|maker| maker.shutdown.store(true, Relaxed));
maker_threads
.into_iter()
.for_each(|thread| thread.join().unwrap());

// ---- After Swap checks ----

let _ = directory_server_instance.shutdown();
directory_server_instance.shutdown.store(true, Relaxed);

thread::sleep(Duration::from_secs(10));

Expand Down
Loading

0 comments on commit 06ca10b

Please sign in to comment.