From da75ec5369a71b0e6e8524903ad81029f2c395ef Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Tue, 24 Oct 2023 15:44:18 +0300 Subject: [PATCH] Create shortcut for single-task scope --- node/libs/concurrency/src/scope/mod.rs | 15 +++++ node/libs/storage/src/rocksdb.rs | 86 ++++++++------------------ 2 files changed, 40 insertions(+), 61 deletions(-) diff --git a/node/libs/concurrency/src/scope/mod.rs b/node/libs/concurrency/src/scope/mod.rs index c883e211..312e2bb7 100644 --- a/node/libs/concurrency/src/scope/mod.rs +++ b/node/libs/concurrency/src/scope/mod.rs @@ -310,3 +310,18 @@ impl<'env, E: 'static + Send> Scope<'env, E> { } } } + +/// Spawns the provided blocking closure `f` and waits until it completes or the context gets canceled. +pub async fn wait_blocking<'a, R, E>( + ctx: &'a ctx::Ctx, + f: impl FnOnce() -> Result + Send + 'a, +) -> Result +where + R: 'static + Send, + E: 'static + From + Send, +{ + run!(ctx, |ctx, s| async { + Ok(s.spawn_blocking(f).join(ctx).await?) + }) + .await +} diff --git a/node/libs/storage/src/rocksdb.rs b/node/libs/storage/src/rocksdb.rs index 43d0cd9e..cb62418f 100644 --- a/node/libs/storage/src/rocksdb.rs +++ b/node/libs/storage/src/rocksdb.rs @@ -53,14 +53,10 @@ impl RocksdbStorage { options.create_missing_column_families(true); options.create_if_missing(true); - let db = scope::run!(ctx, |_, s| async { - Ok(s.spawn_blocking(|| { - rocksdb::DB::open(&options, path) - .context("Failed opening RocksDB") - .map_err(StorageError::Database) - }) - .join(ctx) - .await?) + let db = scope::wait_blocking(ctx, || { + rocksdb::DB::open(&options, path) + .context("Failed opening RocksDB") + .map_err(StorageError::Database) }) .await?; @@ -260,35 +256,23 @@ impl fmt::Debug for RocksdbStorage { #[async_trait] impl BlockStore for RocksdbStorage { async fn head_block(&self, ctx: &ctx::Ctx) -> StorageResult { - scope::run!(ctx, |ctx, s| async { - Ok( - s.spawn_blocking(|| self.head_block_blocking().map_err(StorageError::Database)) - .join(ctx) - .await?, - ) + scope::wait_blocking(ctx, || { + self.head_block_blocking().map_err(StorageError::Database) }) .await } async fn first_block(&self, ctx: &ctx::Ctx) -> StorageResult { - scope::run!(ctx, |ctx, s| async { - Ok( - s.spawn_blocking(|| self.first_block_blocking().map_err(StorageError::Database)) - .join(ctx) - .await?, - ) + scope::wait_blocking(ctx, || { + self.first_block_blocking().map_err(StorageError::Database) }) .await } async fn last_contiguous_block_number(&self, ctx: &ctx::Ctx) -> StorageResult { - scope::run!(ctx, |ctx, s| async { - Ok(s.spawn_blocking(|| { - self.last_contiguous_block_number_blocking() - .map_err(StorageError::Database) - }) - .join(ctx) - .await?) + scope::wait_blocking(ctx, || { + self.last_contiguous_block_number_blocking() + .map_err(StorageError::Database) }) .await } @@ -298,12 +282,8 @@ impl BlockStore for RocksdbStorage { ctx: &ctx::Ctx, number: BlockNumber, ) -> StorageResult> { - scope::run!(ctx, |ctx, s| async { - Ok( - s.spawn_blocking(|| self.block_blocking(number).map_err(StorageError::Database)) - .join(ctx) - .await?, - ) + scope::wait_blocking(ctx, || { + self.block_blocking(number).map_err(StorageError::Database) }) .await } @@ -313,13 +293,9 @@ impl BlockStore for RocksdbStorage { ctx: &ctx::Ctx, range: ops::Range, ) -> StorageResult> { - scope::run!(ctx, |ctx, s| async { - Ok(s.spawn_blocking(|| { - self.missing_block_numbers_blocking(range) - .map_err(StorageError::Database) - }) - .join(ctx) - .await?) + scope::wait_blocking(ctx, || { + self.missing_block_numbers_blocking(range) + .map_err(StorageError::Database) }) .await } @@ -332,13 +308,9 @@ impl BlockStore for RocksdbStorage { #[async_trait] impl WriteBlockStore for RocksdbStorage { async fn put_block(&self, ctx: &ctx::Ctx, block: &FinalBlock) -> StorageResult<()> { - scope::run!(ctx, |ctx, s| async { - Ok(s.spawn_blocking(|| { - self.put_block_blocking(block) - .map_err(StorageError::Database) - }) - .join(ctx) - .await?) + scope::wait_blocking(ctx, || { + self.put_block_blocking(block) + .map_err(StorageError::Database) }) .await } @@ -347,13 +319,9 @@ impl WriteBlockStore for RocksdbStorage { #[async_trait] impl ReplicaStateStore for RocksdbStorage { async fn replica_state(&self, ctx: &ctx::Ctx) -> StorageResult> { - scope::run!(ctx, |ctx, s| async { - Ok(s.spawn_blocking(|| { - self.replica_state_blocking() - .map_err(StorageError::Database) - }) - .join(ctx) - .await?) + scope::wait_blocking(ctx, || { + self.replica_state_blocking() + .map_err(StorageError::Database) }) .await } @@ -363,13 +331,9 @@ impl ReplicaStateStore for RocksdbStorage { ctx: &ctx::Ctx, replica_state: &ReplicaState, ) -> StorageResult<()> { - scope::run!(ctx, |ctx, s| async { - Ok(s.spawn_blocking(|| { - self.put_replica_state_blocking(replica_state) - .map_err(StorageError::Database) - }) - .join(ctx) - .await?) + scope::wait_blocking(ctx, || { + self.put_replica_state_blocking(replica_state) + .map_err(StorageError::Database) }) .await }