Skip to content

Commit

Permalink
Use mpsc channel to parallelize I/O
Browse files Browse the repository at this point in the history
  • Loading branch information
AbdelrahmanElawady committed Jul 22, 2024
1 parent 56a2c15 commit a8058ff
Showing 1 changed file with 40 additions and 13 deletions.
53 changes: 40 additions & 13 deletions rfs/src/clone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,47 @@ use crate::{
fungi::{meta::Block, Reader, Result},
store::{BlockStore, Store},
};
use anyhow::Context;
use std::sync::Arc;
use tokio::{fs::File, io::AsyncReadExt};
use tokio::{fs::File, io::AsyncReadExt, sync::mpsc};

const WORKERS: usize = 10;
const BUFFER: usize = 10;

pub async fn clone<S: Store>(reader: Reader, store: S, cache: Cache<S>) -> Result<()> {
let downloader = BlobDownloader::new(cache);
let (tx, mut rx) = mpsc::channel(BUFFER);

let downloader = BlobDownloader::new(cache, tx);
let mut download_pool = workers::WorkerPool::new(downloader, WORKERS);

let uploader = BlobUploader::new(store.into());
let mut upload_pool = workers::WorkerPool::new(uploader, WORKERS);

let upload_handle = tokio::spawn(async move {
loop {
let file = match rx.recv().await {
Some(f) => f,
None => break,
};

let worker = upload_pool.get().await;
if let Err(err) = worker.send(file) {
log::error!("failed to schedule file upload: {:#}", err);
}
}
upload_pool.close().await
});

let blocks = reader.all_blocks().await?;
for block in blocks {
let worker = download_pool.get().await;
// we wait on output here to make sure there is something to upload
// and let the uploader run in the background.
let file = worker.run(block).await??;

let worker = upload_pool.get().await;
worker.send(file)?;
worker.send(block)?;
}

download_pool.close().await;
upload_pool.close().await;
upload_handle
.await
.context("waiting on upload workers to finish")?;

Ok(())
}
Expand All @@ -37,6 +53,7 @@ where
S: Store,
{
cache: Arc<Cache<S>>,
tx: mpsc::Sender<File>,
}

impl<S> Clone for BlobDownloader<S>
Expand All @@ -46,6 +63,7 @@ where
fn clone(&self) -> Self {
Self {
cache: self.cache.clone(),
tx: self.tx.clone(),
}
}
}
Expand All @@ -54,9 +72,10 @@ impl<S> BlobDownloader<S>
where
S: Store,
{
fn new(cache: Cache<S>) -> Self {
fn new(cache: Cache<S>, tx: mpsc::Sender<File>) -> Self {
Self {
cache: Arc::new(cache),
tx,
}
}
}
Expand All @@ -67,11 +86,19 @@ where
S: Store,
{
type Input = Block;
type Output = Result<File>;
type Output = ();

async fn run(&mut self, block: Self::Input) -> Self::Output {
let (_, file) = self.cache.get(&block).await?;
Ok(file)
let file = match self.cache.get(&block).await {
Ok((_, f)) => f,
Err(err) => {
log::error!("failed to download block: {:#}", err);
return;
}
};
if let Err(err) = self.tx.send(file).await {
log::error!("failed to send file for upload: {:#}", err);
}
}
}
struct BlobUploader<S>
Expand Down

0 comments on commit a8058ff

Please sign in to comment.