Skip to content

Commit

Permalink
Create shortcut for single-task scope
Browse files Browse the repository at this point in the history
  • Loading branch information
slowli committed Oct 24, 2023
1 parent 3584010 commit da75ec5
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 61 deletions.
15 changes: 15 additions & 0 deletions node/libs/concurrency/src/scope/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,3 +310,18 @@ impl<'env, E: 'static + Send> Scope<'env, E> {
}
}
}

/// Spawns the provided blocking closure `f` and waits until it completes or the context gets canceled.
pub async fn wait_blocking<'a, R, E>(
ctx: &'a ctx::Ctx,
f: impl FnOnce() -> Result<R, E> + Send + 'a,
) -> Result<R, E>
where
R: 'static + Send,
E: 'static + From<ctx::Canceled> + Send,
{
run!(ctx, |ctx, s| async {
Ok(s.spawn_blocking(f).join(ctx).await?)
})
.await
}
86 changes: 25 additions & 61 deletions node/libs/storage/src/rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,10 @@ impl RocksdbStorage {
options.create_missing_column_families(true);
options.create_if_missing(true);

let db = scope::run!(ctx, |_, s| async {
Ok(s.spawn_blocking(|| {
rocksdb::DB::open(&options, path)
.context("Failed opening RocksDB")
.map_err(StorageError::Database)
})
.join(ctx)
.await?)
let db = scope::wait_blocking(ctx, || {
rocksdb::DB::open(&options, path)
.context("Failed opening RocksDB")
.map_err(StorageError::Database)
})
.await?;

Expand Down Expand Up @@ -260,35 +256,23 @@ impl fmt::Debug for RocksdbStorage {
#[async_trait]
impl BlockStore for RocksdbStorage {
async fn head_block(&self, ctx: &ctx::Ctx) -> StorageResult<FinalBlock> {
scope::run!(ctx, |ctx, s| async {
Ok(
s.spawn_blocking(|| self.head_block_blocking().map_err(StorageError::Database))
.join(ctx)
.await?,
)
scope::wait_blocking(ctx, || {
self.head_block_blocking().map_err(StorageError::Database)
})
.await
}

async fn first_block(&self, ctx: &ctx::Ctx) -> StorageResult<FinalBlock> {
scope::run!(ctx, |ctx, s| async {
Ok(
s.spawn_blocking(|| self.first_block_blocking().map_err(StorageError::Database))
.join(ctx)
.await?,
)
scope::wait_blocking(ctx, || {
self.first_block_blocking().map_err(StorageError::Database)
})
.await
}

async fn last_contiguous_block_number(&self, ctx: &ctx::Ctx) -> StorageResult<BlockNumber> {
scope::run!(ctx, |ctx, s| async {
Ok(s.spawn_blocking(|| {
self.last_contiguous_block_number_blocking()
.map_err(StorageError::Database)
})
.join(ctx)
.await?)
scope::wait_blocking(ctx, || {
self.last_contiguous_block_number_blocking()
.map_err(StorageError::Database)
})
.await
}
Expand All @@ -298,12 +282,8 @@ impl BlockStore for RocksdbStorage {
ctx: &ctx::Ctx,
number: BlockNumber,
) -> StorageResult<Option<FinalBlock>> {
scope::run!(ctx, |ctx, s| async {
Ok(
s.spawn_blocking(|| self.block_blocking(number).map_err(StorageError::Database))
.join(ctx)
.await?,
)
scope::wait_blocking(ctx, || {
self.block_blocking(number).map_err(StorageError::Database)
})
.await
}
Expand All @@ -313,13 +293,9 @@ impl BlockStore for RocksdbStorage {
ctx: &ctx::Ctx,
range: ops::Range<BlockNumber>,
) -> StorageResult<Vec<BlockNumber>> {
scope::run!(ctx, |ctx, s| async {
Ok(s.spawn_blocking(|| {
self.missing_block_numbers_blocking(range)
.map_err(StorageError::Database)
})
.join(ctx)
.await?)
scope::wait_blocking(ctx, || {
self.missing_block_numbers_blocking(range)
.map_err(StorageError::Database)
})
.await
}
Expand All @@ -332,13 +308,9 @@ impl BlockStore for RocksdbStorage {
#[async_trait]
impl WriteBlockStore for RocksdbStorage {
async fn put_block(&self, ctx: &ctx::Ctx, block: &FinalBlock) -> StorageResult<()> {
scope::run!(ctx, |ctx, s| async {
Ok(s.spawn_blocking(|| {
self.put_block_blocking(block)
.map_err(StorageError::Database)
})
.join(ctx)
.await?)
scope::wait_blocking(ctx, || {
self.put_block_blocking(block)
.map_err(StorageError::Database)
})
.await
}
Expand All @@ -347,13 +319,9 @@ impl WriteBlockStore for RocksdbStorage {
#[async_trait]
impl ReplicaStateStore for RocksdbStorage {
async fn replica_state(&self, ctx: &ctx::Ctx) -> StorageResult<Option<ReplicaState>> {
scope::run!(ctx, |ctx, s| async {
Ok(s.spawn_blocking(|| {
self.replica_state_blocking()
.map_err(StorageError::Database)
})
.join(ctx)
.await?)
scope::wait_blocking(ctx, || {
self.replica_state_blocking()
.map_err(StorageError::Database)
})
.await
}
Expand All @@ -363,13 +331,9 @@ impl ReplicaStateStore for RocksdbStorage {
ctx: &ctx::Ctx,
replica_state: &ReplicaState,
) -> StorageResult<()> {
scope::run!(ctx, |ctx, s| async {
Ok(s.spawn_blocking(|| {
self.put_replica_state_blocking(replica_state)
.map_err(StorageError::Database)
})
.join(ctx)
.await?)
scope::wait_blocking(ctx, || {
self.put_replica_state_blocking(replica_state)
.map_err(StorageError::Database)
})
.await
}
Expand Down

0 comments on commit da75ec5

Please sign in to comment.