Skip to content

Commit

Permalink
Switch to unbounded channel (#304)
Browse files Browse the repository at this point in the history
  • Loading branch information
rkuris authored Oct 4, 2023
1 parent f1b351e commit a4c94af
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 29 deletions.
2 changes: 1 addition & 1 deletion firewood/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ impl Db {
.block_nbit(params.wal_block_nbit)
.max_revisions(cfg.wal.max_revisions)
.build();
let (sender, inbound) = tokio::sync::mpsc::channel(cfg.buffer.max_buffered);
let (sender, inbound) = tokio::sync::mpsc::unbounded_channel();
let disk_requester = DiskBufferRequester::new(sender);
let buffer = cfg.buffer.clone();
let disk_thread = Some(std::thread::spawn(move || {
Expand Down
31 changes: 14 additions & 17 deletions firewood/src/storage/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,6 @@ pub enum BufferCmd {
/// Config for the disk buffer.
#[derive(TypedBuilder, Clone, Debug)]
pub struct DiskBufferConfig {
/// Maximum buffered disk buffer commands.
#[builder(default = 4096)]
pub max_buffered: usize,
/// Maximum number of pending pages.
#[builder(default = 65536)] // 256MB total size by default
pub max_pending: usize,
Expand Down Expand Up @@ -111,7 +108,7 @@ impl Notifiers {
/// Responsible for processing [`BufferCmd`]s from the [`DiskBufferRequester`]
/// and managing the persistence of pages.
pub struct DiskBuffer {
inbound: mpsc::Receiver<BufferCmd>,
inbound: mpsc::UnboundedReceiver<BufferCmd>,
aiomgr: AioManager,
cfg: DiskBufferConfig,
wal_cfg: WalConfig,
Expand All @@ -120,7 +117,7 @@ pub struct DiskBuffer {
impl DiskBuffer {
/// Create a new aio managed disk buffer.
pub fn new(
inbound: mpsc::Receiver<BufferCmd>,
inbound: mpsc::UnboundedReceiver<BufferCmd>,
cfg: &DiskBufferConfig,
wal: &WalConfig,
) -> Result<Self, AioError> {
Expand Down Expand Up @@ -556,20 +553,20 @@ async fn process(
/// ```
#[derive(Clone, Debug)]
pub struct DiskBufferRequester {
sender: mpsc::Sender<BufferCmd>,
sender: mpsc::UnboundedSender<BufferCmd>,
}

impl DiskBufferRequester {
/// Create a new requester.
pub fn new(sender: mpsc::Sender<BufferCmd>) -> Self {
pub fn new(sender: mpsc::UnboundedSender<BufferCmd>) -> Self {
Self { sender }
}

/// Get a page from the buffer.
pub fn get_page(&self, space_id: SpaceId, page_id: u64) -> Option<Page> {
let (resp_tx, resp_rx) = oneshot::channel();
self.sender
.blocking_send(BufferCmd::GetPage((space_id, page_id), resp_tx))
.send(BufferCmd::GetPage((space_id, page_id), resp_tx))
.map_err(StoreError::Send)
.ok();
resp_rx.blocking_recv().unwrap()
Expand All @@ -578,19 +575,19 @@ impl DiskBufferRequester {
/// Sends a batch of writes to the buffer.
pub fn write(&self, page_batch: Vec<BufferWrite>, write_batch: AshRecord) {
self.sender
.blocking_send(BufferCmd::WriteBatch(page_batch, write_batch))
.send(BufferCmd::WriteBatch(page_batch, write_batch))
.map_err(StoreError::Send)
.ok();
}

pub fn shutdown(&self) {
self.sender.blocking_send(BufferCmd::Shutdown).ok().unwrap()
self.sender.send(BufferCmd::Shutdown).ok().unwrap()
}

/// Initialize the Wal.
pub fn init_wal(&self, waldir: &str, rootpath: &Path) {
self.sender
.blocking_send(BufferCmd::InitWal(
.send(BufferCmd::InitWal(
rootpath.to_path_buf(),
waldir.to_string(),
))
Expand All @@ -602,7 +599,7 @@ impl DiskBufferRequester {
pub fn collect_ash(&self, nrecords: usize) -> Result<Vec<AshRecord>, StoreError<RecvError>> {
let (resp_tx, resp_rx) = oneshot::channel();
self.sender
.blocking_send(BufferCmd::CollectAsh(nrecords, resp_tx))
.send(BufferCmd::CollectAsh(nrecords, resp_tx))
.map_err(StoreError::Send)
.ok();
resp_rx.blocking_recv().map_err(StoreError::Receive)
Expand All @@ -611,7 +608,7 @@ impl DiskBufferRequester {
/// Register a cached space to the buffer.
pub fn reg_cached_space(&self, space_id: SpaceId, files: Arc<FilePool>) {
self.sender
.blocking_send(BufferCmd::RegCachedSpace(space_id, files))
.send(BufferCmd::RegCachedSpace(space_id, files))
.map_err(StoreError::Send)
.ok();
}
Expand Down Expand Up @@ -665,7 +662,7 @@ mod tests {
fn test_buffer_with_undo() {
let temp_dir = get_tmp_dir();

let buf_cfg = DiskBufferConfig::builder().max_buffered(1).build();
let buf_cfg = DiskBufferConfig::builder().build();
let wal_cfg = WalConfig::builder().build();
let disk_requester = init_buffer(buf_cfg, wal_cfg);

Expand Down Expand Up @@ -740,7 +737,7 @@ mod tests {
#[test]
#[ignore = "ref: https://github.com/ava-labs/firewood/issues/45"]
fn test_buffer_with_redo() {
let buf_cfg = DiskBufferConfig::builder().max_buffered(1).build();
let buf_cfg = DiskBufferConfig::builder().build();
let wal_cfg = WalConfig::builder().build();
let disk_requester = init_buffer(buf_cfg, wal_cfg);

Expand Down Expand Up @@ -813,7 +810,7 @@ mod tests {

#[test]
fn test_multi_stores() {
let buf_cfg = DiskBufferConfig::builder().max_buffered(1).build();
let buf_cfg = DiskBufferConfig::builder().build();
let wal_cfg = WalConfig::builder().build();
let disk_requester = init_buffer(buf_cfg, wal_cfg);

Expand Down Expand Up @@ -915,7 +912,7 @@ mod tests {
}

fn init_buffer(buf_cfg: DiskBufferConfig, wal_cfg: WalConfig) -> DiskBufferRequester {
let (sender, inbound) = tokio::sync::mpsc::channel(buf_cfg.max_buffered);
let (sender, inbound) = tokio::sync::mpsc::unbounded_channel();
let disk_requester = DiskBufferRequester::new(sender);
std::thread::spawn(move || {
let disk_buffer = DiskBuffer::new(inbound, &buf_cfg, &wal_cfg).unwrap();
Expand Down
11 changes: 0 additions & 11 deletions fwdctl/src/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,16 +150,6 @@ pub struct Options {
)]
blob_ncached_objs: usize,

/// Disk Buffer options
#[arg(
long,
required = false,
default_value_t = 4096,
value_name = "DISK_BUFFER_MAX_BUFFERED",
help = "Maximum buffered disk buffer."
)]
max_buffered: usize,

#[arg(
long,
required = false,
Expand Down Expand Up @@ -271,7 +261,6 @@ pub fn initialize_db_config(opts: &Options) -> DbConfig {
merkle_ncached_objs: opts.merkle_ncached_objs,
},
buffer: DiskBufferConfig {
max_buffered: opts.max_buffered,
max_pending: opts.max_pending,
max_aio_requests: opts.max_aio_requests,
max_aio_response: opts.max_aio_response,
Expand Down

0 comments on commit a4c94af

Please sign in to comment.