Skip to content

Commit

Permalink
Box over vec for io (#543)
Browse files Browse the repository at this point in the history
  • Loading branch information
rkuris authored Feb 20, 2024
1 parent b3f6e28 commit f7d7414
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 14 deletions.
4 changes: 2 additions & 2 deletions firewood/src/db/proposal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ impl Proposal {

// schedule writes to the disk
rev_inner.disk_requester.write(
vec![
Box::new([
BufferWrite {
space_id: store.merkle.payload.id(),
delta: merkle_payload_redo,
Expand All @@ -241,7 +241,7 @@ impl Proposal {
space_id: rev_inner.root_hash_staging.id(),
delta: root_hash_redo,
},
],
]),
AshRecord(
[
(MERKLE_META_SPACE, merkle_meta_wal),
Expand Down
26 changes: 14 additions & 12 deletions firewood/src/storage/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,14 @@ use tokio::{
};
use typed_builder::TypedBuilder;

type BufferWrites = Box<[BufferWrite]>;

#[derive(Debug)]
pub enum BufferCmd {
/// Initialize the Wal.
InitWal(PathBuf, String),
/// Process a write batch against the underlying store.
WriteBatch(Vec<BufferWrite>, AshRecord),
WriteBatch(BufferWrites, AshRecord),
/// Get a page from the disk buffer.
GetPage((SpaceId, u64), oneshot::Sender<Option<Page>>),
CollectAsh(usize, oneshot::Sender<Vec<AshRecord>>),
Expand Down Expand Up @@ -343,7 +345,7 @@ async fn run_wal_queue(
wal: Rc<Mutex<WalWriter<WalFileImpl, WalStoreImpl>>>,
pending: Rc<RefCell<HashMap<(SpaceId, u64), PendingPage>>>,
file_pools: Rc<RefCell<[Option<Arc<FilePool>>; 255]>>,
mut writes: mpsc::Receiver<(Vec<BufferWrite>, AshRecord)>,
mut writes: mpsc::Receiver<(BufferWrites, AshRecord)>,
fc_notifier: Rc<Notify>,
aiomgr: Rc<AioManager>,
) {
Expand All @@ -356,14 +358,14 @@ async fn run_wal_queue(

if let Some((bw, ac)) = writes.recv().await {
records.push(ac);
bwrites.extend(bw);
bwrites.extend(bw.into_vec());
} else {
break;
}

while let Ok((bw, ac)) = writes.try_recv() {
records.push(ac);
bwrites.extend(bw);
bwrites.extend(bw.into_vec());

if records.len() >= max.batch {
break;
Expand Down Expand Up @@ -467,8 +469,8 @@ async fn process(
wal_cfg: &WalConfig,
req: BufferCmd,
max: WalQueueMax,
wal_in: mpsc::Sender<(Vec<BufferWrite>, AshRecord)>,
writes: &mut Option<mpsc::Receiver<(Vec<BufferWrite>, AshRecord)>>,
wal_in: mpsc::Sender<(BufferWrites, AshRecord)>,
writes: &mut Option<mpsc::Receiver<(BufferWrites, AshRecord)>>,
) -> bool {
match req {
BufferCmd::Shutdown => return false,
Expand Down Expand Up @@ -590,7 +592,7 @@ impl DiskBufferRequester {
}

/// Sends a batch of writes to the buffer.
pub fn write(&self, page_batch: Vec<BufferWrite>, write_batch: AshRecord) {
pub fn write(&self, page_batch: BufferWrites, write_batch: AshRecord) {
self.sender
.send(BufferCmd::WriteBatch(page_batch, write_batch))
.map_err(StoreError::Send)
Expand Down Expand Up @@ -808,10 +810,10 @@ mod tests {
// page is not yet persisted to disk.
assert!(disk_requester.get_page(STATE_SPACE, 0).is_none());
disk_requester.write(
vec![BufferWrite {
Box::new([BufferWrite {
space_id: STATE_SPACE,
delta: redo_delta,
}],
}]),
AshRecord([(STATE_SPACE, wal)].into()),
);

Expand Down Expand Up @@ -943,7 +945,7 @@ mod tests {
disk_requester
}

fn create_batches(rev_mut: &StoreRevMut) -> (Vec<BufferWrite>, AshRecord) {
fn create_batches(rev_mut: &StoreRevMut) -> (BufferWrites, AshRecord) {
let deltas = std::mem::replace(
&mut *rev_mut.deltas.write(),
StoreRevMutDelta {
Expand All @@ -959,10 +961,10 @@ mod tests {
}
pages.sort_by_key(|p| p.0);

let page_batch = vec![BufferWrite {
let page_batch = Box::new([BufferWrite {
space_id: STATE_SPACE,
delta: StoreDelta(pages),
}];
}]);

let write_batch = AshRecord([(STATE_SPACE, deltas.plain)].into());
(page_batch, write_batch)
Expand Down

0 comments on commit f7d7414

Please sign in to comment.