From a8058ff56944c4651553988ea5c41427f27aa7c3 Mon Sep 17 00:00:00 2001 From: AbdelrahmanElawady Date: Mon, 22 Jul 2024 17:51:27 +0300 Subject: [PATCH] Use mpsc channel to parallelize I/O --- rfs/src/clone.rs | 53 ++++++++++++++++++++++++++++++++++++------------ 1 file changed, 40 insertions(+), 13 deletions(-) diff --git a/rfs/src/clone.rs b/rfs/src/clone.rs index 504b60f..8c64851 100644 --- a/rfs/src/clone.rs +++ b/rfs/src/clone.rs @@ -3,31 +3,47 @@ use crate::{ fungi::{meta::Block, Reader, Result}, store::{BlockStore, Store}, }; +use anyhow::Context; use std::sync::Arc; -use tokio::{fs::File, io::AsyncReadExt}; +use tokio::{fs::File, io::AsyncReadExt, sync::mpsc}; const WORKERS: usize = 10; +const BUFFER: usize = 10; pub async fn clone(reader: Reader, store: S, cache: Cache) -> Result<()> { - let downloader = BlobDownloader::new(cache); + let (tx, mut rx) = mpsc::channel(BUFFER); + + let downloader = BlobDownloader::new(cache, tx); let mut download_pool = workers::WorkerPool::new(downloader, WORKERS); let uploader = BlobUploader::new(store.into()); let mut upload_pool = workers::WorkerPool::new(uploader, WORKERS); + let upload_handle = tokio::spawn(async move { + loop { + let file = match rx.recv().await { + Some(f) => f, + None => break, + }; + + let worker = upload_pool.get().await; + if let Err(err) = worker.send(file) { + log::error!("failed to schedule file upload: {:#}", err); + } + } + upload_pool.close().await + }); + let blocks = reader.all_blocks().await?; for block in blocks { let worker = download_pool.get().await; - // we wait on output here to make sure there is something to upload - // and let the uploader run in the background. - let file = worker.run(block).await??; - - let worker = upload_pool.get().await; - worker.send(file)?; + worker.send(block)?; } download_pool.close().await; - upload_pool.close().await; + upload_handle + .await + .context("waiting on upload workers to finish")?; Ok(()) } @@ -37,6 +53,7 @@ where S: Store, { cache: Arc>, + tx: mpsc::Sender, } impl Clone for BlobDownloader @@ -46,6 +63,7 @@ where fn clone(&self) -> Self { Self { cache: self.cache.clone(), + tx: self.tx.clone(), } } } @@ -54,9 +72,10 @@ impl BlobDownloader where S: Store, { - fn new(cache: Cache) -> Self { + fn new(cache: Cache, tx: mpsc::Sender) -> Self { Self { cache: Arc::new(cache), + tx, } } } @@ -67,11 +86,19 @@ where S: Store, { type Input = Block; - type Output = Result; + type Output = (); async fn run(&mut self, block: Self::Input) -> Self::Output { - let (_, file) = self.cache.get(&block).await?; - Ok(file) + let file = match self.cache.get(&block).await { + Ok((_, f)) => f, + Err(err) => { + log::error!("failed to download block: {:#}", err); + return; + } + }; + if let Err(err) = self.tx.send(file).await { + log::error!("failed to send file for upload: {:#}", err); + } } } struct BlobUploader