Skip to content

Commit

Permalink
Fix cancellation detection with storage tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
slowli committed Oct 23, 2023
1 parent 1d5fc5e commit d537760
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 9 deletions.
14 changes: 9 additions & 5 deletions node/actors/consensus/src/replica/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{
collections::{BTreeMap, HashMap},
sync::Arc,
};
use storage::ReplicaStateStore;
use storage::{ReplicaStateStore, StorageError};
use tracing::{instrument, warn};

/// The StateMachine struct contains the state of the replica. This is the most complex state machine and is responsible
Expand Down Expand Up @@ -120,12 +120,16 @@ impl StateMachine {
block_proposal_cache: self.block_proposal_cache.clone(),
};

scope::run_blocking!(ctx, |ctx, s| {
let store_result = scope::run_blocking!(ctx, |ctx, s| {
let backup_future = self.storage.put_replica_state(ctx, &backup);
s.spawn(backup_future).join(ctx).block()?;
Ok(())
})
.expect("Failed backing up replica state");
// ^ We don't know how to recover from DB errors, so panicking is the only option so far.
});
match store_result {
Ok(()) => { /* Everything went fine */ }
Err(StorageError::Canceled(_)) => tracing::trace!("Storing replica state was canceled"),
Err(StorageError::Database(err)) => panic!("Failed storing replica state: {err}"),
// ^ We don't know how to recover from DB errors, so panicking is the only option so far.
}
}
}
2 changes: 1 addition & 1 deletion node/actors/sync_blocks/src/peers/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ async fn test_peer_states<T: Test>(test: T) {
scope::run!(ctx, |ctx, s| async {
s.spawn_bg(async {
peer_states.run(ctx).await.or_else(|err| {
if err.is::<ctx::Canceled>() {
if err.root_cause().is::<ctx::Canceled>() {
Ok(()) // Swallow cancellation errors after the test is finished
} else {
Err(err)
Expand Down
2 changes: 1 addition & 1 deletion node/actors/sync_blocks/src/tests/end_to_end.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ async fn test_sync_blocks<T: GossipNetworkTest>(test: T) {
.unwrap_err();

tracing::trace!(?key, "Node task completed");
if err.is::<ctx::Canceled>() {
if err.root_cause().is::<ctx::Canceled>() {
Ok(()) // Test has successfully completed
} else {
Err(err)
Expand Down
4 changes: 2 additions & 2 deletions node/actors/sync_blocks/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ async fn subscribing_to_state_updates() {
scope::run!(ctx, |ctx, s| async {
s.spawn_bg(async {
actor.run(ctx).await.or_else(|err| {
if err.is::<ctx::Canceled>() {
if err.root_cause().is::<ctx::Canceled>() {
Ok(()) // Swallow cancellation errors after the test is finished
} else {
Err(err)
Expand Down Expand Up @@ -221,7 +221,7 @@ async fn getting_blocks() {
scope::run!(ctx, |ctx, s| async {
s.spawn_bg(async {
actor.run(ctx).await.or_else(|err| {
if err.is::<ctx::Canceled>() {
if err.root_cause().is::<ctx::Canceled>() {
Ok(()) // Swallow cancellation errors after the test is finished
} else {
Err(err)
Expand Down

0 comments on commit d537760

Please sign in to comment.