Skip to content

Commit

Permalink
Wait for lock file in container chain restart (#555)
Browse files Browse the repository at this point in the history
  • Loading branch information
tmpolaczyk authored May 22, 2024
1 parent c88ef1b commit 5dd5b8b
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 10 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ clap = { version = "4.1.6", default-features = false, features = [ "derive" ] }
core_extensions = "1.5.3"
exit-future = { version = "0.2.0" }
flume = "0.10.9"
fs2 = "0.4.3"
futures = { version = "0.3.1" }
futures-timer = "3.0.1"
hex = { version = "0.4.3", default-features = false }
Expand Down
1 change: 1 addition & 0 deletions node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ async-trait = { workspace = true }
clap = { workspace = true, features = [ "derive" ] }
exit-future = { workspace = true }
flume = { workspace = true }
fs2 = { workspace = true }
futures = { workspace = true }
jsonrpsee = { workspace = true, features = [ "server" ] }
log = { workspace = true }
Expand Down
111 changes: 101 additions & 10 deletions node/src/container_chain_spawner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use {
cumulus_relay_chain_interface::RelayChainInterface,
dancebox_runtime::{AccountId, Block, BlockNumber},
dc_orchestrator_chain_interface::OrchestratorChainInterface,
fs2::FileExt,
futures::FutureExt,
node_common::{command::generate_genesis_block, service::NodeBuilderConfig},
pallet_author_noting_runtime_api::AuthorNotingApi,
Expand All @@ -45,7 +46,7 @@ use {
std::{
collections::{HashMap, HashSet},
future::Future,
path::Path,
path::{Path, PathBuf},
pin::Pin,
sync::{Arc, Mutex},
time::Instant,
Expand Down Expand Up @@ -100,6 +101,8 @@ pub struct ContainerChainSpawnerState {
pub struct ContainerChainState {
/// Handle that can be used to stop the container chain
stop_handle: StopContainerChain,
/// Database path
db_path: PathBuf,
}

/// Stops a container chain when signal is sent. The bool means `keep_db`, whether to keep the
Expand Down Expand Up @@ -283,10 +286,22 @@ impl ContainerChainSpawner {
&container_chain_cli,
container_chain_cli.base.keep_db,
)?;
// Need to add a sleep here to ensure that the partial components created in
// `open_and_maybe_delete_db` have enough time to close.
// Wait here to for the partial components created in`open_and_maybe_delete_db` to close.
// Dropping is not enough because there is some background process that keeps the database open,
// so we check the paritydb lock file directly.
log::info!("Restarting container chain {}", container_chain_para_id);
sleep(Duration::from_secs(10)).await;
let max_restart_timeout = Duration::from_secs(60);
wait_for_paritydb_lock(&db_path, max_restart_timeout)
.await
.map_err(|e| {
log::warn!(
"Error waiting for chain {} to release db lock: {:?}",
container_chain_para_id,
e
);

e
})?;
}

// Select appropiate sync mode. We want to use WarpSync unless the db still exists,
Expand Down Expand Up @@ -349,6 +364,7 @@ impl ContainerChainSpawner {
signal,
id: monitor_id,
},
db_path: db_path.clone(),
},
);
}
Expand Down Expand Up @@ -416,7 +432,10 @@ impl ContainerChainSpawner {
}

/// Stop a container chain. Prints a warning if the container chain was not running.
fn stop(&self, container_chain_para_id: ParaId, keep_db: bool) {
/// Returns the database path for the container chain, can be used with `wait_for_paritydb_lock`
/// to ensure that the container chain has fully stopped. The database path can be `None` if the
/// chain was not running.
fn stop(&self, container_chain_para_id: ParaId, keep_db: bool) -> Option<PathBuf> {
let mut state = self.state.lock().expect("poison error");
let stop_handle = state
.spawned_container_chains
Expand All @@ -433,6 +452,8 @@ impl ContainerChainSpawner {

// Send signal to perform graceful shutdown, which will delete the db if needed
let _ = stop_handle.stop_handle.signal.send(keep_db);

Some(stop_handle.db_path)
}
None => {
// Do not print the warning message if this is a container chain that has failed to
Expand All @@ -443,6 +464,8 @@ impl ContainerChainSpawner {
container_chain_para_id
);
}

None
}
}
}
Expand Down Expand Up @@ -480,7 +503,7 @@ impl ContainerChainSpawner {
let HandleUpdateAssignmentResult {
chains_to_stop,
chains_to_start,
need_to_restart,
need_to_restart: _,
} = handle_update_assignment_state_change(
&mut self.state.lock().expect("poison error"),
self.orchestrator_para_id,
Expand All @@ -507,15 +530,38 @@ impl ContainerChainSpawner {
}

// Stop all container chains that are no longer needed
let mut db_paths_restart = vec![];
for para_id in chains_to_stop {
// Keep db if we are currently assigned to this chain
let keep_db = Some(para_id) == current;
self.stop(para_id, keep_db);
let maybe_db_path = self.stop(para_id, keep_db);
// If we are restarting this chain, save its db_path to check when it actually stopped
if let Some(db_path) = maybe_db_path {
if chains_to_start.contains(&para_id) {
db_paths_restart.push((para_id, db_path));
}
}
}

if need_to_restart {
// Give it some time to stop properly
sleep(Duration::from_secs(10)).await;
if !db_paths_restart.is_empty() {
// Ensure the chains we stopped actually stopped by checking if their database is unlocked.
// Using `join_all` because in one edge case we may be restarting 2 chains,
// but almost always this will be only one future.
let max_restart_timeout = Duration::from_secs(60);
let futs = db_paths_restart
.into_iter()
.map(|(para_id, db_path)| async move {
wait_for_paritydb_lock(&db_path, max_restart_timeout)
.await
.map_err(|e| {
log::warn!(
"Error waiting for chain {} to release db lock: {:?}",
para_id,
e
);
})
});
futures::future::join_all(futs).await;
}

// Start all new container chains (usually 1)
Expand All @@ -531,6 +577,7 @@ impl ContainerChainSpawner {
struct HandleUpdateAssignmentResult {
chains_to_stop: Vec<ParaId>,
chains_to_start: Vec<ParaId>,
#[allow(dead_code)] // no longer used except in tests
need_to_restart: bool,
}

Expand Down Expand Up @@ -813,6 +860,47 @@ fn parse_boot_nodes_ignore_invalid(
.collect()
}

async fn wait_for_paritydb_lock(db_path: &Path, max_timeout: Duration) -> Result<(), String> {
let now = Instant::now();

while now.elapsed() < max_timeout {
let lock_held = check_paritydb_lock_held(db_path)
.map_err(|e| format!("Failed to check if lock file is held: {}", e))?;
if !lock_held {
return Ok(());
}
sleep(Duration::from_secs(1)).await;
}

Err("Timeout when waiting for paritydb lock".to_string())
}

/// Given a path to a paritydb database, check if its lock file is held. This indicates that a
/// background process is still using the database, so we should wait before trying to open it.
///
/// This should be kept up to date with the way paritydb handles the lock file:
/// <https://github.com/paritytech/parity-db/blob/2b6820e310a08678d4540c044f41a93d87343ac8/src/db.rs#L215>
fn check_paritydb_lock_held(db_path: &Path) -> Result<bool, std::io::Error> {
if !db_path.is_dir() {
// Lock file does not exist, so it is not held
return Ok(false);
}

let mut lock_path: std::path::PathBuf = db_path.to_owned();
lock_path.push("lock");
let lock_file = std::fs::OpenOptions::new()
.create(true)
.read(true)
.write(true)
.truncate(true)
.open(lock_path.as_path())?;
// Check if the lock file is busy by trying to lock it.
// Returns err if failed to adquire the lock.
let lock_held = lock_file.try_lock_exclusive().is_err();

Ok(lock_held)
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -890,6 +978,8 @@ mod tests {
+ Send
+ Sync,
> = Arc::new(collate_closure);
// Dummy db_path for tests, is not actually used
let db_path = PathBuf::from(format!("/tmp/container-{}/db", container_chain_para_id));

let old = self
.state
Expand All @@ -900,6 +990,7 @@ mod tests {
container_chain_para_id,
ContainerChainState {
stop_handle: StopContainerChain { signal, id: 0 },
db_path,
},
);

Expand Down

0 comments on commit 5dd5b8b

Please sign in to comment.