diff --git a/node/actors/network/src/mux/tests.rs b/node/actors/network/src/mux/tests.rs index 1d97d0f3..83986a9c 100644 --- a/node/actors/network/src/mux/tests.rs +++ b/node/actors/network/src/mux/tests.rs @@ -293,53 +293,57 @@ async fn test_transport_closed() { write_frame_size: 100, }); scope::run!(ctx, |ctx, s| async { - let (s1, s2) = noise::testonly::pipe(ctx).await; - // Spawns a peer that establishes the a transient stream and then tries to read from it. - // Reading should terminate as soon as we close the transport. - s.spawn(async { - let mut stream = scope::run!(ctx, |ctx, s| async { - let mut mux = mux::Mux { - cfg: cfg.clone(), - accept: BTreeMap::default(), - connect: BTreeMap::default(), - }; - let q = mux::StreamQueue::new(1); - mux.connect.insert(cap, q.clone()); - s.spawn_bg(async { expected(mux.run(ctx, s2).await).context("mux2.run()") }); - anyhow::Ok(q.open(ctx).await.unwrap()) + let streams = s + .spawn(async { + let (s1, s2) = noise::testonly::pipe(ctx).await; + // Establish a transient stream, then close the transport (s1 and s2). + scope::run!(ctx, |ctx, s| async { + let outbound = s.spawn(async { + let mut mux = mux::Mux { + cfg: cfg.clone(), + accept: BTreeMap::default(), + connect: BTreeMap::default(), + }; + let q = mux::StreamQueue::new(1); + mux.connect.insert(cap, q.clone()); + s.spawn_bg(async { + expected(mux.run(ctx, s2).await).context("[connect] mux.run()") + }); + q.open(ctx).await.context("[connect] q.open()") + }); + let inbound = s.spawn(async { + let mut mux = mux::Mux { + cfg: cfg.clone(), + accept: BTreeMap::default(), + connect: BTreeMap::default(), + }; + let q = mux::StreamQueue::new(1); + mux.accept.insert(cap, q.clone()); + s.spawn_bg(async { + expected(mux.run(ctx, s1).await).context("[accept] mux.run()") + }); + q.open(ctx).await.context("[accept] q.open()") + }); + Ok([ + inbound.join(ctx).await.context("inbound")?, + outbound.join(ctx).await.context("outbound")?, + ]) + }) + .await }) - .await - .unwrap(); - + .join(ctx) + .await?; + // Check how the streams without transport behave. + for mut s in streams { let mut buf = bytes::Buffer::new(100); // Read is expected to succeed, but no data should be read. - stream.read.read_exact(ctx, &mut buf).await.unwrap(); + s.read.read_exact(ctx, &mut buf).await.unwrap(); assert_eq!(buf.len(), 0); // Writing will succeed (thanks to buffering), but flushing should fail // because the transport is closed. - stream.write.write_all(ctx, &[1, 2, 3]).await.unwrap(); - assert!(stream.write.flush(ctx).await.is_err()); - anyhow::Ok(()) - }); - - let mut mux = mux::Mux { - cfg: cfg.clone(), - accept: BTreeMap::default(), - connect: BTreeMap::default(), - }; - let q = mux::StreamQueue::new(1); - mux.accept.insert(cap, q.clone()); - // Accept the stream and drop the connection completely. - s.spawn_bg(async { expected(mux.run(ctx, s1).await).context("mux1.run()") }); - let mut stream = q.open(ctx).await.unwrap(); - - let mut buf = bytes::Buffer::new(100); - stream.read.read_exact(ctx, &mut buf).await.unwrap(); - // The peer multiplexer is dropped to complete `read_exact()`, so we don't have a deadlock here. - assert_eq!(buf.len(), 0); - stream.write.write_all(ctx, &[1, 2, 3]).await.unwrap(); - assert!(stream.write.flush(ctx).await.is_err()); - + s.write.write_all(ctx, &[1, 2, 3]).await.unwrap(); + assert!(s.write.flush(ctx).await.is_err()); + } Ok(()) }) .await diff --git a/node/actors/sync_blocks/src/tests/end_to_end.rs b/node/actors/sync_blocks/src/tests/end_to_end.rs index 46c30a27..8b045627 100644 --- a/node/actors/sync_blocks/src/tests/end_to_end.rs +++ b/node/actors/sync_blocks/src/tests/end_to_end.rs @@ -222,7 +222,7 @@ impl GossipNetwork { } #[async_trait] -trait GossipNetworkTest: fmt::Debug { +trait GossipNetworkTest: fmt::Debug + Send { /// Returns the number of nodes in the gossip network and number of peers for each node. fn network_params(&self) -> (usize, usize); diff --git a/node/actors/sync_blocks/src/tests/mod.rs b/node/actors/sync_blocks/src/tests/mod.rs index 4bb49a95..19d0fbce 100644 --- a/node/actors/sync_blocks/src/tests/mod.rs +++ b/node/actors/sync_blocks/src/tests/mod.rs @@ -139,28 +139,30 @@ async fn subscribing_to_state_updates() { anyhow::Ok(()) }); - let initial_state = state_subscriber.borrow_and_update(); - assert_eq!( - initial_state.first_stored_block, - genesis_block.justification - ); - assert_eq!( - initial_state.last_contiguous_stored_block, - genesis_block.justification - ); - assert_eq!(initial_state.last_stored_block, genesis_block.justification); - drop(initial_state); + { + let initial_state = state_subscriber.borrow_and_update(); + assert_eq!( + initial_state.first_stored_block, + genesis_block.justification + ); + assert_eq!( + initial_state.last_contiguous_stored_block, + genesis_block.justification + ); + assert_eq!(initial_state.last_stored_block, genesis_block.justification); + } storage.put_block(ctx, &block_1).await.unwrap(); - let new_state = sync::changed(ctx, &mut state_subscriber).await?; - assert_eq!(new_state.first_stored_block, genesis_block.justification); - assert_eq!( - new_state.last_contiguous_stored_block, - block_1.justification - ); - assert_eq!(new_state.last_stored_block, block_1.justification); - drop(new_state); + { + let new_state = sync::changed(ctx, &mut state_subscriber).await?; + assert_eq!(new_state.first_stored_block, genesis_block.justification); + assert_eq!( + new_state.last_contiguous_stored_block, + block_1.justification + ); + assert_eq!(new_state.last_stored_block, block_1.justification); + } storage.put_block(ctx, &block_3).await.unwrap(); diff --git a/node/libs/concurrency/src/scope/mod.rs b/node/libs/concurrency/src/scope/mod.rs index 312e2bb7..c7a9076d 100644 --- a/node/libs/concurrency/src/scope/mod.rs +++ b/node/libs/concurrency/src/scope/mod.rs @@ -45,8 +45,7 @@ mod state; mod task; pub use macros::*; -use must_complete::must_complete; -use state::{CancelGuard, State, TerminateGuard}; +use state::{CancelGuard, OrPanic, State, TerminateGuard}; use task::{Task, Terminated}; use tracing::Instrument as _; @@ -100,6 +99,7 @@ impl<'env, T> JoinHandle<'env, T> { /// Awaits completion of the task. /// Returns the result of the task. /// Returns `Canceled` if the context has been canceled before the task. + /// Panics if the awaited task panicked. /// /// Caller is expected to provide their local context as an argument, which /// is neccessarily a descendant of the scope's context (because JoinHandle cannot @@ -126,6 +126,14 @@ impl<'env, T> JoinHandle<'env, T> { Err(ctx::Canceled) }) } + + /// Unconditional join used in run/run_blocking to await the root task. + async fn join_raw(self) -> ctx::OrCanceled { + match self.0.await { + Ok(Ok(v)) => Ok(v), + _ => Err(ctx::Canceled), + } + } } /// Scope represents a concurrent computation bounded by lifetime `'env`. @@ -244,17 +252,13 @@ impl<'env, E: 'static + Send> Scope<'env, E> { /// the lifetime of the call. /// /// `root_task` is executed as a root task of this scope. - /// Although both blocking and async tasks can be executed - /// in this scope, the root task is executed inline in the caller's - /// thread (i.e. we don't call `tokio::spawn(root_task)`) so it has to - /// be async. /// // Safety: // - we are assuming that `run` is only called via `run!` macro // - <'env> is exactly equal to the lifetime of the `Scope::new(...).run(...).await` call. // - in particular `run(...)` cannot be assigned to a local variable, because // it would reference the temporal `Scope::new(...)` object. - // - the returned future is wrapped in `must_complete` so it will abort + // - the returned future uses `must_complete::Guard` so it will abort // the whole process if dropped before completion. // - before the first `poll()` call of the `run(...)` future may be forgotten (via `std::mem::forget`) // directly or indirectly and that's safe, because no unsafe code has been executed yet. @@ -266,22 +270,43 @@ impl<'env, E: 'static + Send> Scope<'env, E> { // transitively use references stored in the root task, so they stay valid as well, // until `run()` future is dropped. #[doc(hidden)] - pub fn run(&'env mut self, root_task: F) -> impl 'env + Future> + pub async fn run(&'env mut self, root_task: F) -> Result where F: 'env + FnOnce(&'env ctx::Ctx, &'env Self) -> Fut, - Fut: 'env + Future>, + Fut: 'env + Send + Future>, { - must_complete(async move { - let guard = Arc::new(State::make(self.ctx.clone())); - self.cancel_guard = Arc::downgrade(&guard); - self.terminate_guard = Arc::downgrade(guard.terminate_guard()); - let state = guard.terminate_guard().state().clone(); - let root_res = Task::Main(guard).run(root_task(&self.ctx, self)).await; - // Wait for the scope termination. - state.terminated().await; - // Return the error, or the result of the root_task. - state.take_err().map_or_else(|| Ok(root_res.unwrap()), Err) - }) + // Abort if run() future is dropped before completion. + let must_complete = must_complete::Guard; + + let guard = Arc::new(State::make(self.ctx.clone())); + self.cancel_guard = Arc::downgrade(&guard); + self.terminate_guard = Arc::downgrade(guard.terminate_guard()); + let state = guard.terminate_guard().state().clone(); + // Spawn the root task. We cannot run it directly in this task, + // because if the root task panicked, we wouldn't be able to + // wait for other tasks to finish. + let root_task = self.spawn(root_task(&self.ctx, self)); + // Once we spawned the root task we can drop the guard. + drop(guard); + // Await for the completion of the root_task. + let root_task_result = root_task.join_raw().await; + // Wait for the scope termination. + state.terminated().await; + + // All tasks completed. + must_complete.defuse(); + + // Return the result of the root_task, the error, or propagate the panic. + match state.take_err() { + // All tasks have completed successfully, so in particular root_task has returned Ok. + None => Ok(root_task_result.unwrap()), + // One of the tasks returned an error, but no panic occurred. + Some(OrPanic::Err(err)) => Err(err), + // Note that panic is propagated only once all of the tasks are run to completion. + Some(OrPanic::Panic) => { + panic!("one of the tasks panicked, look for a stack trace above") + } + } } /// not public; used by run_blocking! macro. @@ -291,7 +316,7 @@ impl<'env, E: 'static + Send> Scope<'env, E> { /// task (in particular, not from async code). /// Behaves analogically to `run`. #[doc(hidden)] - pub fn run_blocking(&'env mut self, root_task: F) -> Result + pub fn run_blocking(&'env mut self, root_task: F) -> Result where E: 'static + Send, F: 'env + FnOnce(&'env ctx::Ctx, &'env Self) -> Result, @@ -300,28 +325,39 @@ impl<'env, E: 'static + Send> Scope<'env, E> { self.cancel_guard = Arc::downgrade(&guard); self.terminate_guard = Arc::downgrade(guard.terminate_guard()); let state = guard.terminate_guard().state().clone(); - let root_res = Task::Main(guard).run_blocking(|| root_task(&self.ctx, self)); + // Spawn the root task. We cannot run it directly in this task, + // because if the root task panicked, we wouldn't be able to + // wait for other tasks to finish. + let root_task = self.spawn_blocking(|| root_task(&self.ctx, self)); + // Once we spawned the root task we can drop the guard. + drop(guard); + // Await for the completion of the root_task. + let root_task_result = ctx::block_on(root_task.join_raw()); // Wait for the scope termination. ctx::block_on(state.terminated()); - // Return the error, or the result of the root_task. + + // Return the result of the root_task, the error, or propagate the panic. match state.take_err() { - Some(err) => Err(err), - None => Ok(root_res.unwrap()), + // All tasks have completed successfully, so in particular root_task has returned Ok. + None => Ok(root_task_result.unwrap()), + // One of the tasks returned an error, but no panic occurred. + Some(OrPanic::Err(err)) => Err(err), + // Note that panic is propagated only once all of the tasks are run to completion. + Some(OrPanic::Panic) => { + panic!("one of the tasks panicked, look for a stack trace above") + } } } } -/// Spawns the provided blocking closure `f` and waits until it completes or the context gets canceled. -pub async fn wait_blocking<'a, R, E>( - ctx: &'a ctx::Ctx, - f: impl FnOnce() -> Result + Send + 'a, -) -> Result -where - R: 'static + Send, - E: 'static + From + Send, -{ - run!(ctx, |ctx, s| async { - Ok(s.spawn_blocking(f).join(ctx).await?) - }) - .await +/// Spawns the blocking closure `f` and unconditionally awaits for completion. +/// Panics if `f` panics. +/// Aborts if dropped before completion. +pub async fn wait_blocking<'a, T: 'static + Send>(f: impl 'a + Send + FnOnce() -> T) -> T { + let must_complete = must_complete::Guard; + let res = unsafe { spawn_blocking(Box::new(|| Ok(f()))) } + .join_raw() + .await; + must_complete.defuse(); + res.expect("awaited task panicked") } diff --git a/node/libs/concurrency/src/scope/must_complete.rs b/node/libs/concurrency/src/scope/must_complete.rs index 821c70ce..4d112187 100644 --- a/node/libs/concurrency/src/scope/must_complete.rs +++ b/node/libs/concurrency/src/scope/must_complete.rs @@ -1,28 +1,25 @@ -//! must_complete wraps a future, so that it aborts if it is dropped before completion. -//! Note that it ABORTS the process rather than just panic, so that we get a strong guarantee -//! of completion in both `panic=abort` and `panic=unwind` compilation modes. +//! Guard that aborts the process if dropped without being defused. +//! Note that it ABORTS the process rather than just panic, so it behaves consistently +//! in both `panic=abort` and `panic=unwind` compilation modes. //! +//! It should be used to prevent a future from being dropped before completion. //! Possibility that a future can be dropped/aborted at every await makes the control flow unnecessarily complicated. //! In fact, only few basic futures (like io primitives) actually need to be abortable, so //! that they can be put together into a tokio::select block. All the higher level logic //! would greatly benefit (in terms of readability and bug-resistance) from being non-abortable. //! Rust doesn't support linear types as of now, so best we can do is a runtime check. -use std::future::Future; -/// must_complete wraps a future, so that it aborts if it is dropped before completion. -pub(super) fn must_complete(fut: Fut) -> impl Future { - let guard = Guard; - async move { - let res = fut.await; - std::mem::forget(guard); - res +/// Guard which aborts the process when dropped. +/// Use `Guard::defuse()` to avoid aborting. +pub(super) struct Guard; + +impl Guard { + /// Drops the guard silently, so that it doesn't abort the process. + pub(crate) fn defuse(self) { + std::mem::forget(self) } } -/// Guard which aborts the process when dropped. -/// Use std::mem::ManuallyDrop to avoid the drop call. -struct Guard; - impl Drop for Guard { fn drop(&mut self) { // We always abort here, no matter if compiled with panic=abort or panic=unwind. diff --git a/node/libs/concurrency/src/scope/state.rs b/node/libs/concurrency/src/scope/state.rs index f4143fb2..5133332e 100644 --- a/node/libs/concurrency/src/scope/state.rs +++ b/node/libs/concurrency/src/scope/state.rs @@ -14,13 +14,18 @@ use crate::{ctx, signal}; use std::sync::{Arc, Mutex}; +pub(super) enum OrPanic { + Err(E), + Panic, +} + /// Internal representation of the scope. pub(super) struct State { /// Context of this scope. /// All tasks spawned in this scope are provided with this context. ctx: ctx::Ctx, /// First error returned by any task in the scope. - err: Mutex>, + err: Mutex>>, /// Signal sent once the scope is terminated. terminated: signal::Once, } @@ -34,7 +39,7 @@ impl State { /// Takes out the error from the scope. /// Called after scope termination to return the error to /// the `scope::run!` caller. - pub(super) fn take_err(&self) -> Option { + pub(super) fn take_err(&self) -> Option> { debug_assert!(self.terminated.try_recv()); std::mem::take(&mut *self.err.lock().unwrap()) } @@ -69,15 +74,18 @@ impl TerminateGuard { } /// Sets the scope error if it is not already set. + /// Panic overrides an error. /// Called by scope tasks which resulted with an error. /// It has a side effect of canceling the scope. - pub(super) fn set_err(&self, err: E) { + pub(super) fn set_err(&self, err: OrPanic) { let mut m = self.0.err.lock().unwrap(); - if m.is_some() { - return; + match (&*m, &err) { + // Panic overrides an error, but error doesn't override an error. + (Some(OrPanic::Panic), _) | (Some(OrPanic::Err(_)), OrPanic::Err(_)) => return, + _ => {} } - *m = Some(err); self.0.ctx.cancel(); + *m = Some(err); } } diff --git a/node/libs/concurrency/src/scope/task.rs b/node/libs/concurrency/src/scope/task.rs index b8f76aaa..d801058b 100644 --- a/node/libs/concurrency/src/scope/task.rs +++ b/node/libs/concurrency/src/scope/task.rs @@ -60,6 +60,30 @@ pub(super) enum Task { Background(Arc>), } +/// Reports a panic to the scope of the task if the task +/// panics and unwinds. +struct PanicReporter(Option>); + +impl Drop for PanicReporter { + fn drop(&mut self) { + if let Some(t) = self.0.take() { + t.guard().set_err(scope::OrPanic::Panic); + } + } +} + +impl PanicReporter { + /// Wraps a task into a PanicReporter. + pub(crate) fn new(t: Task) -> Self { + Self(Some(t)) + } + /// Silently drops the PanicReporter without reporting a panic. + /// Returns the owned task. + pub(crate) fn defuse(mut self) -> Task { + self.0.take().unwrap() + } +} + impl Task { /// Getter of the guard owned by the task. fn guard(&self) -> &scope::TerminateGuard { @@ -80,10 +104,13 @@ impl Task { self, f: impl Future>, ) -> Result { - match f.await { + let panic_reporter = PanicReporter::new(self); + let res = f.await; + let this = panic_reporter.defuse(); + match res { Ok(v) => Ok(v), Err(err) => { - self.guard().set_err(err); + this.guard().set_err(scope::OrPanic::Err(err)); Err(Terminated) } } @@ -92,10 +119,13 @@ impl Task { /// Runs an sync blocking task in the scope. MUST be executed on a dedicated thread. /// See `Task::run` for behavior. See module docs for desciption of blocking tasks. pub(super) fn run_blocking(self, f: impl FnOnce() -> Result) -> Result { - match f() { + let panic_reporter = PanicReporter::new(self); + let res = f(); + let this = panic_reporter.defuse(); + match res { Ok(v) => Ok(v), Err(err) => { - self.guard().set_err(err); + this.guard().set_err(scope::OrPanic::Err(err)); Err(Terminated) } } diff --git a/node/libs/concurrency/src/scope/tests.rs b/node/libs/concurrency/src/scope/tests.rs index 2e3e06f4..a2a8f76e 100644 --- a/node/libs/concurrency/src/scope/tests.rs +++ b/node/libs/concurrency/src/scope/tests.rs @@ -1,10 +1,24 @@ use crate::{ctx, scope, testonly}; -use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::{ + atomic::{AtomicU64, Ordering}, + Arc, +}; // run a trivial future until completion => OK #[tokio::test] async fn must_complete_ok() { - assert_eq!(5, scope::must_complete(async move { 5 }).await); + scope::must_complete::Guard.defuse(); +} + +#[tokio::test] +async fn test_wait_blocking_ok() { + assert_eq!(5, scope::wait_blocking(|| 5).await); +} + +#[tokio::test] +#[should_panic] +async fn test_wait_blocking_panic() { + scope::wait_blocking(|| panic!("inner panic")).await; } type R = Result<(), usize>; @@ -119,8 +133,7 @@ fn test_access_to_vars_outside_of_scope() { testonly::with_runtimes(|| async { // Lifetime of `a` is larger than scope's lifetime, // so it should be accessible from scope's tasks. - let a = AtomicU64::new(0); - let a = &a; + let a = &AtomicU64::new(0); let ctx = &ctx::test_root(&ctx::RealClock); scope::run!(ctx, |ctx, s| async { s.spawn_blocking(|| { @@ -154,3 +167,90 @@ fn test_access_to_vars_outside_of_scope() { assert_eq!(6, a.load(Ordering::Relaxed)); }); } + +#[tokio::test] +async fn test_wait_for_completion_after_panic_blocking() { + const N: u64 = 10; + let a = Arc::new(AtomicU64::new(0)); + let a2 = a.clone(); + let res = tokio::task::spawn_blocking(|| { + let ctx = &ctx::test_root(&ctx::RealClock); + let a = a2; + scope::run_blocking!(ctx, |ctx, s| { + for _ in 0..N { + s.spawn(async { + ctx.canceled().await; + a.fetch_add(1, Ordering::Relaxed); + Ok(()) + }); + } + s.spawn::<()>(async { + panic!("yolo"); + }); + for _ in 0..N { + s.spawn_blocking(|| { + ctx.canceled().block(); + a.fetch_add(1, Ordering::Relaxed); + Ok(()) + }); + } + anyhow::Ok(()) + }) + }) + .await; + assert!(res.is_err()); + assert_eq!(2 * N, a.load(Ordering::Relaxed)); +} + +#[tokio::test] +async fn test_wait_for_completion_after_panic() { + const N: u64 = 10; + let a = Arc::new(AtomicU64::new(0)); + let a2 = a.clone(); + let res = tokio::task::spawn(async { + let ctx = &ctx::test_root(&ctx::RealClock); + let a = a2; + scope::run!(ctx, |ctx, s| async { + for _ in 0..N { + s.spawn(async { + ctx.canceled().await; + a.fetch_add(1, Ordering::Relaxed); + Ok(()) + }); + } + s.spawn::<()>(async { + panic!("yolo"); + }); + for _ in 0..N { + s.spawn_blocking(|| { + ctx.canceled().block(); + a.fetch_add(1, Ordering::Relaxed); + Ok(()) + }); + } + anyhow::Ok(()) + }) + .await + }) + .await; + assert!(res.is_err()); + assert_eq!(2 * N, a.load(Ordering::Relaxed)); +} + +#[tokio::test] +async fn test_panic_overrides_error() { + let res = tokio::task::spawn(async { + let ctx = &ctx::test_root(&ctx::RealClock); + let res: anyhow::Result<()> = scope::run!(ctx, |ctx, s| async { + s.spawn::<()>(async { + anyhow::bail!("simple error"); + }); + ctx.canceled().await; + panic!("overriding the error") + }) + .await; + res + }) + .await; + assert!(res.is_err()); +} diff --git a/node/libs/storage/src/rocksdb.rs b/node/libs/storage/src/rocksdb.rs index d2f1d43d..a343ecf4 100644 --- a/node/libs/storage/src/rocksdb.rs +++ b/node/libs/storage/src/rocksdb.rs @@ -53,7 +53,7 @@ impl RocksdbStorage { options.create_missing_column_families(true); options.create_if_missing(true); - let db = scope::wait_blocking(ctx, || { + let db = scope::wait_blocking(|| { rocksdb::DB::open(&options, path) .context("Failed opening RocksDB") .map_err(StorageError::Database) @@ -255,22 +255,16 @@ impl fmt::Debug for RocksdbStorage { #[async_trait] impl BlockStore for RocksdbStorage { - async fn head_block(&self, ctx: &ctx::Ctx) -> StorageResult { - scope::wait_blocking(ctx, || { - self.head_block_blocking().map_err(StorageError::Database) - }) - .await + async fn head_block(&self, _ctx: &ctx::Ctx) -> StorageResult { + scope::wait_blocking(|| self.head_block_blocking().map_err(StorageError::Database)).await } - async fn first_block(&self, ctx: &ctx::Ctx) -> StorageResult { - scope::wait_blocking(ctx, || { - self.first_block_blocking().map_err(StorageError::Database) - }) - .await + async fn first_block(&self, _ctx: &ctx::Ctx) -> StorageResult { + scope::wait_blocking(|| self.first_block_blocking().map_err(StorageError::Database)).await } - async fn last_contiguous_block_number(&self, ctx: &ctx::Ctx) -> StorageResult { - scope::wait_blocking(ctx, || { + async fn last_contiguous_block_number(&self, _ctx: &ctx::Ctx) -> StorageResult { + scope::wait_blocking(|| { self.last_contiguous_block_number_blocking() .map_err(StorageError::Database) }) @@ -279,21 +273,18 @@ impl BlockStore for RocksdbStorage { async fn block( &self, - ctx: &ctx::Ctx, + _ctx: &ctx::Ctx, number: BlockNumber, ) -> StorageResult> { - scope::wait_blocking(ctx, || { - self.block_blocking(number).map_err(StorageError::Database) - }) - .await + scope::wait_blocking(|| self.block_blocking(number).map_err(StorageError::Database)).await } async fn missing_block_numbers( &self, - ctx: &ctx::Ctx, + _ctx: &ctx::Ctx, range: ops::Range, ) -> StorageResult> { - scope::wait_blocking(ctx, || { + scope::wait_blocking(|| { self.missing_block_numbers_blocking(range) .map_err(StorageError::Database) }) @@ -307,8 +298,8 @@ impl BlockStore for RocksdbStorage { #[async_trait] impl WriteBlockStore for RocksdbStorage { - async fn put_block(&self, ctx: &ctx::Ctx, block: &FinalBlock) -> StorageResult<()> { - scope::wait_blocking(ctx, || { + async fn put_block(&self, _ctx: &ctx::Ctx, block: &FinalBlock) -> StorageResult<()> { + scope::wait_blocking(|| { self.put_block_blocking(block) .map_err(StorageError::Database) }) @@ -318,8 +309,8 @@ impl WriteBlockStore for RocksdbStorage { #[async_trait] impl ReplicaStateStore for RocksdbStorage { - async fn replica_state(&self, ctx: &ctx::Ctx) -> StorageResult> { - scope::wait_blocking(ctx, || { + async fn replica_state(&self, _ctx: &ctx::Ctx) -> StorageResult> { + scope::wait_blocking(|| { self.replica_state_blocking() .map_err(StorageError::Database) }) @@ -328,10 +319,10 @@ impl ReplicaStateStore for RocksdbStorage { async fn put_replica_state( &self, - ctx: &ctx::Ctx, + _ctx: &ctx::Ctx, replica_state: &ReplicaState, ) -> StorageResult<()> { - scope::wait_blocking(ctx, || { + scope::wait_blocking(|| { self.put_replica_state_blocking(replica_state) .map_err(StorageError::Database) })