Skip to content

Commit

Permalink
Add UnsubmittedRead to follow new API for read part.
Browse files Browse the repository at this point in the history
Currently, there's no unsubmitted API change in read part yet. Add the
new API following write part.

Add simple link operation test cases to use read/write submittable
operations.
  • Loading branch information
ileixe committed Jan 30, 2024
1 parent ba8a267 commit 350f637
Show file tree
Hide file tree
Showing 9 changed files with 112 additions and 59 deletions.
2 changes: 1 addition & 1 deletion examples/cat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ fn main() {

loop {
// Read a chunk
let (res, b) = file.read_at(buf, pos).await;
let (res, b) = file.read_at(buf, pos).submit().await;
let n = res.unwrap();

if n == 0 {
Expand Down
2 changes: 1 addition & 1 deletion examples/mix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ fn main() {

loop {
// Read a chunk
let (res, b) = file.read_at(buf, pos).await;
let (res, b) = file.read_at(buf, pos).submit().await;
let n = res.unwrap();

if n == 0 {
Expand Down
12 changes: 5 additions & 7 deletions src/fs/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::fs::OpenOptions;
use crate::io::SharedFd;

use crate::runtime::driver::op::Op;
use crate::{UnsubmittedOneshot, UnsubmittedWrite};
use crate::{UnsubmittedOneshot, UnsubmittedRead, UnsubmittedWrite};
use std::fmt;
use std::io;
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
Expand Down Expand Up @@ -165,7 +165,7 @@ impl File {
/// let buffer = vec![0; 10];
///
/// // Read up to 10 bytes
/// let (res, buffer) = f.read_at(buffer, 0).await;
/// let (res, buffer) = f.read_at(buffer, 0).submit().await;
/// let n = res?;
///
/// println!("The bytes: {:?}", &buffer[..n]);
Expand All @@ -176,10 +176,8 @@ impl File {
/// })
/// }
/// ```
pub async fn read_at<T: BoundedBufMut>(&self, buf: T, pos: u64) -> crate::BufResult<usize, T> {
// Submit the read operation
let op = Op::read_at(&self.fd, buf, pos).unwrap();
op.await
pub fn read_at<T: BoundedBufMut>(&self, buf: T, pos: u64) -> UnsubmittedRead<T> {
UnsubmittedOneshot::read_at(&self.fd, buf, pos)
}

/// Read some bytes at the specified offset from the file into the specified
Expand Down Expand Up @@ -417,7 +415,7 @@ impl File {
}

while buf.bytes_total() != 0 {
let (res, slice) = self.read_at(buf, pos).await;
let (res, slice) = self.read_at(buf, pos).submit().await;
match res {
Ok(0) => {
return (
Expand Down
2 changes: 1 addition & 1 deletion src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub(crate) use noop::NoOp;

mod open;

mod read;
pub(crate) mod read;

mod read_fixed;

Expand Down
91 changes: 48 additions & 43 deletions src/io/read.rs
Original file line number Diff line number Diff line change
@@ -1,64 +1,69 @@
use io_uring::cqueue::Entry;

use crate::buf::BoundedBufMut;
use crate::io::SharedFd;
use crate::BufResult;
use crate::{BufResult, OneshotOutputTransform, UnsubmittedOneshot};

use crate::runtime::driver::op::{Completable, CqeResult, Op};
use crate::runtime::CONTEXT;
use std::io;
use std::marker::PhantomData;

/// An unsubmitted read operation.
pub type UnsubmittedRead<T> = UnsubmittedOneshot<ReadData<T>, ReadTransform<T>>;

pub(crate) struct Read<T> {
#[allow(missing_docs)]
pub struct ReadData<T> {
/// Holds a strong ref to the FD, preventing the file from being closed
/// while the operation is in-flight.
#[allow(dead_code)]
fd: SharedFd,
_fd: SharedFd,

/// Reference to the in-flight buffer.
pub(crate) buf: T,
buf: T,
}

impl<T: BoundedBufMut> Op<Read<T>> {
pub(crate) fn read_at(fd: &SharedFd, buf: T, offset: u64) -> io::Result<Op<Read<T>>> {
use io_uring::{opcode, types};

CONTEXT.with(|x| {
x.handle().expect("Not in a runtime context").submit_op(
Read {
fd: fd.clone(),
buf,
},
|read| {
// Get raw buffer info
let ptr = read.buf.stable_mut_ptr();
let len = read.buf.bytes_total();
opcode::Read::new(types::Fd(fd.raw_fd()), ptr, len as _)
.offset(offset as _)
.build()
},
)
})
}
#[allow(missing_docs)]
pub struct ReadTransform<T> {
_phantom: PhantomData<T>,
}

impl<T> Completable for Read<T>
impl<T> OneshotOutputTransform for ReadTransform<T>
where
T: BoundedBufMut,
{
type Output = BufResult<usize, T>;
type StoredData = ReadData<T>;

fn complete(self, cqe: CqeResult) -> Self::Output {
// Convert the operation result to `usize`
let res = cqe.result.map(|v| v as usize);
// Recover the buffer
let mut buf = self.buf;

// If the operation was successful, advance the initialized cursor.
if let Ok(n) = res {
fn transform_oneshot_output(self, mut data: Self::StoredData, cqe: Entry) -> Self::Output {
let n = cqe.result();
let res = if n >= 0 {
// Safety: the kernel wrote `n` bytes to the buffer.
unsafe {
buf.set_init(n);
}
}
unsafe { data.buf.set_init(n as usize) };
Ok(n as usize)
} else {
Err(io::Error::from_raw_os_error(-n))
};

(res, data.buf)
}
}

impl<T: BoundedBufMut> UnsubmittedRead<T> {
pub(crate) fn read_at(fd: &SharedFd, mut buf: T, offset: u64) -> Self {
use io_uring::{opcode, types};

// Get raw buffer info
let ptr = buf.stable_mut_ptr();
let len = buf.bytes_total();

(res, buf)
Self::new(
ReadData {
_fd: fd.clone(),
buf,
},
ReadTransform {
_phantom: PhantomData,
},
opcode::Read::new(types::Fd(fd.raw_fd()), ptr, len as _)
.offset(offset as _)
.build(),
)
}
}
3 changes: 1 addition & 2 deletions src/io/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,7 @@ impl Socket {
}

pub(crate) async fn read<T: BoundedBufMut>(&self, buf: T) -> crate::BufResult<usize, T> {
let op = Op::read_at(&self.fd, buf, 0).unwrap();
op.await
UnsubmittedOneshot::read_at(&self.fd, buf, 0).submit().await
}

pub(crate) async fn read_fixed<T>(&self, buf: T) -> crate::BufResult<usize, T>
Expand Down
7 changes: 4 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
//! // Read some data, the buffer is passed by ownership and
//! // submitted to the kernel. When the operation completes,
//! // we get the buffer back.
//! let (res, buf) = file.read_at(buf, 0).await;
//! let (res, buf) = file.read_at(buf, 0).submit().await;
//! let n = res?;
//!
//! // Display the contents
Expand Down Expand Up @@ -78,6 +78,7 @@ pub mod buf;
pub mod fs;
pub mod net;

pub use io::read::*;
pub use io::write::*;
pub use runtime::driver::op::{InFlightOneshot, OneshotOutputTransform, UnsubmittedOneshot};
pub use runtime::spawn;
Expand Down Expand Up @@ -115,7 +116,7 @@ use std::future::Future;
/// // Read some data, the buffer is passed by ownership and
/// // submitted to the kernel. When the operation completes,
/// // we get the buffer back.
/// let (res, buf) = file.read_at(buf, 0).await;
/// let (res, buf) = file.read_at(buf, 0).submit().await;
/// let n = res?;
///
/// // Display the contents
Expand Down Expand Up @@ -254,7 +255,7 @@ impl Builder {
/// // Read some data, the buffer is passed by ownership and
/// // submitted to the kernel. When the operation completes,
/// // we get the buffer back.
/// let (res, buf) = file.read_at(buf, 0).await;
/// let (res, buf) = file.read_at(buf, 0).submit().await;
/// let n = res?;
///
/// // Display the contents
Expand Down
1 change: 1 addition & 0 deletions tests/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ fn complete_ops_on_drop() {
},
25 * 1024 * 1024,
)
.submit()
.await
.0
.unwrap();
Expand Down
51 changes: 50 additions & 1 deletion tests/fs_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const HELLO: &[u8] = b"hello world...";

async fn read_hello(file: &File) {
let buf = Vec::with_capacity(1024);
let (res, buf) = file.read_at(buf, 0).await;
let (res, buf) = file.read_at(buf, 0).submit().await;
let n = res.unwrap();

assert_eq!(n, HELLO.len());
Expand Down Expand Up @@ -315,6 +315,55 @@ fn basic_fallocate() {
});
}

#[test]
fn write_linked() {
tokio_uring::start(async {
let tempfile = tempfile();
let file = File::create(tempfile.path()).await.unwrap();

let write1 = file
.write_at(HELLO, 0)
.set_flags(io_uring::squeue::Flags::IO_LINK)
.submit();
let write2 = file.write_at(HELLO, HELLO.len() as u64).submit();

let res1 = write1.await;
let res2 = write2.await;
res1.0.unwrap();
res2.0.unwrap();

let file = std::fs::read(tempfile.path()).unwrap();
assert_eq!(file, [HELLO, HELLO].concat());
});
}

#[test]
fn read_linked() {
tokio_uring::start(async {
let mut tempfile = tempfile();
let file = File::open(tempfile.path()).await.unwrap();

tempfile.write_all(&[HELLO, HELLO].concat()).unwrap();

let buf1 = Vec::with_capacity(HELLO.len());
let buf2 = Vec::with_capacity(HELLO.len());

let read1 = file
.read_at(buf1, 0)
.set_flags(io_uring::squeue::Flags::IO_LINK)
.submit();
let read2 = file.read_at(buf2, HELLO.len() as u64).submit();

let res1 = read1.await;
let res2 = read2.await;

res1.0.unwrap();
res2.0.unwrap();

assert_eq!([HELLO, HELLO].concat(), [res1.1, res2.1].concat());
});
}

fn tempfile() -> NamedTempFile {
NamedTempFile::new().unwrap()
}
Expand Down

0 comments on commit 350f637

Please sign in to comment.