diff --git a/.github/workflows/ci.sh b/.github/workflows/ci.sh old mode 100755 new mode 100644 diff --git a/monoio/src/driver/op.rs b/monoio/src/driver/op.rs index 485b9a2..822f15b 100644 --- a/monoio/src/driver/op.rs +++ b/monoio/src/driver/op.rs @@ -9,6 +9,7 @@ use crate::driver; pub(crate) mod close; pub(crate) mod read; +pub(crate) mod write; mod accept; mod connect; @@ -17,8 +18,6 @@ mod open; mod poll; mod recv; mod send; -mod write; - #[cfg(unix)] mod statx; diff --git a/monoio/src/driver/op/read.rs b/monoio/src/driver/op/read.rs index 4342dbf..4640b97 100644 --- a/monoio/src/driver/op/read.rs +++ b/monoio/src/driver/op/read.rs @@ -42,6 +42,10 @@ read_result! { Read { buf }, ReadAt { buf }, ReadVec { buf_vec }, +} + +#[cfg(not(windows))] +read_result! { ReadVecAt { buf_vec }, } diff --git a/monoio/src/driver/op/write.rs b/monoio/src/driver/op/write.rs index 7d87426..364f781 100644 --- a/monoio/src/driver/op/write.rs +++ b/monoio/src/driver/op/write.rs @@ -1,7 +1,11 @@ use std::io; -#[cfg(all(unix, any(feature = "legacy", feature = "poll-io")))] +#[cfg(unix)] use std::os::unix::prelude::AsRawFd; +#[cfg(windows)] +use std::os::windows::io::AsRawHandle; +#[cfg(any(feature = "legacy", feature = "poll-io"))] +pub(crate) use impls::*; #[cfg(all(target_os = "linux", feature = "iouring"))] use io_uring::{opcode, types}; #[cfg(all(windows, any(feature = "legacy", feature = "poll-io")))] @@ -15,7 +19,76 @@ use crate::{ BufResult, }; +macro_rules! write_result { + ($($name:ident<$T:ident : $Trait:ident> { $buf:ident }), * $(,)?) => { + $( + impl<$T: $Trait> super::Op<$name<$T>> { + pub(crate) async fn result(self) -> BufResult { + let complete = self.await; + (complete.meta.result.map(|v| v as _), complete.data.$buf) + } + } + )* + }; +} + +write_result! { + Write { buf }, + WriteAt { buf }, + WriteVec { buf_vec }, +} + +#[cfg(not(windows))] +write_result! { + WriteVecAt { buf_vec }, +} + pub(crate) struct Write { + fd: SharedFd, + pub(crate) buf: T, +} + +impl Op> { + pub(crate) fn write(fd: SharedFd, buf: T) -> io::Result { + Op::submit_with(Write { fd, buf }) + } +} + +impl OpAble for Write { + #[cfg(all(target_os = "linux", feature = "iouring"))] + fn uring_op(&mut self) -> io_uring::squeue::Entry { + // Refers to https://docs.rs/io-uring/latest/io_uring/opcode/struct.Write.html. + // + // If `offset` is set to `-1`, the offset will use (and advance) the file position, like + // the write(2) system calls + opcode::Write::new( + types::Fd(self.fd.as_raw_fd()), + self.buf.read_ptr(), + self.buf.bytes_init() as _, + ) + .offset(-1i64 as _) + .build() + } + + #[cfg(any(feature = "legacy", feature = "poll-io"))] + #[inline] + fn legacy_interest(&self) -> Option<(crate::driver::ready::Direction, usize)> { + self.fd + .registered_index() + .map(|idx| (Direction::Write, idx)) + } + + #[cfg(any(feature = "legacy", feature = "poll-io"))] + fn legacy_call(&mut self) -> io::Result { + #[cfg(windows)] + let fd = self.fd.as_raw_handle() as _; + #[cfg(unix)] + let fd = self.fd.as_raw_fd(); + write(fd, self.buf.read_ptr(), self.buf.bytes_init()) + } +} + +pub(crate) struct WriteAt { /// Holds a strong ref to the FD, preventing the file from being closed /// while the operation is in-flight. #[allow(unused)] @@ -28,30 +101,13 @@ pub(crate) struct Write { pub(crate) buf: T, } -impl Op> { - pub(crate) fn write_at(fd: &SharedFd, buf: T, offset: u64) -> io::Result>> { - Op::submit_with(Write { - fd: fd.clone(), - offset, - buf, - }) - } - - pub(crate) fn write(fd: SharedFd, buf: T) -> io::Result>> { - Op::submit_with(Write { - fd, - offset: -1i64 as u64, - buf, - }) - } - - pub(crate) async fn result(self) -> BufResult { - let complete = self.await; - (complete.meta.result.map(|v| v as _), complete.data.buf) +impl Op> { + pub(crate) fn write_at(fd: SharedFd, buf: T, offset: u64) -> io::Result>> { + Op::submit_with(WriteAt { fd, offset, buf }) } } -impl OpAble for Write { +impl OpAble for WriteAt { #[cfg(all(target_os = "linux", feature = "iouring"))] fn uring_op(&mut self) -> io_uring::squeue::Entry { opcode::Write::new( @@ -71,75 +127,14 @@ impl OpAble for Write { .map(|idx| (Direction::Write, idx)) } - #[cfg(all(any(feature = "legacy", feature = "poll-io"), unix))] + #[cfg(any(feature = "legacy", feature = "poll-io"))] fn legacy_call(&mut self) -> io::Result { - use crate::syscall_u32; - + #[cfg(windows)] + let fd = self.fd.as_raw_handle() as _; + #[cfg(unix)] let fd = self.fd.as_raw_fd(); - let mut seek_offset = -1; - - if -1i64 as u64 != self.offset { - seek_offset = libc::off_t::try_from(self.offset) - .map_err(|_| io::Error::new(io::ErrorKind::Other, "offset too big"))?; - } - - if seek_offset == -1 { - syscall_u32!(write(fd, self.buf.read_ptr() as _, self.buf.bytes_init())) - } else { - syscall_u32!(pwrite( - fd, - self.buf.read_ptr() as _, - self.buf.bytes_init(), - seek_offset as _ - )) - } - } - - #[cfg(all(any(feature = "legacy", feature = "poll-io"), windows))] - fn legacy_call(&mut self) -> io::Result { - use windows_sys::Win32::{ - Foundation::{GetLastError, ERROR_HANDLE_EOF}, - System::IO::OVERLAPPED, - }; - - let fd = self.fd.raw_handle() as _; - let seek_offset = self.offset; - let mut bytes_write = 0; - - let ret = unsafe { - // see https://learn.microsoft.com/zh-cn/windows/win32/api/fileapi/nf-fileapi-readfile - if seek_offset as i64 != -1 { - let mut overlapped: OVERLAPPED = std::mem::zeroed(); - overlapped.Anonymous.Anonymous.Offset = seek_offset as u32; // Lower 32 bits of the offset - overlapped.Anonymous.Anonymous.OffsetHigh = (seek_offset >> 32) as u32; // Higher 32 bits of the offset - - WriteFile( - fd, - self.buf.read_ptr(), - self.buf.bytes_init() as u32, - &mut bytes_write, - &overlapped as *const _ as *mut _, - ) - } else { - WriteFile( - fd, - self.buf.read_ptr(), - self.buf.bytes_init() as u32, - &mut bytes_write, - std::ptr::null_mut(), - ) - } - }; - - if ret == TRUE { - return Ok(bytes_write); - } - - match unsafe { GetLastError() } { - ERROR_HANDLE_EOF => Ok(bytes_write), - error => Err(io::Error::from_raw_os_error(error as _)), - } + write_at(fd, self.buf.read_ptr(), self.buf.bytes_init(), self.offset) } } @@ -147,35 +142,20 @@ pub(crate) struct WriteVec { /// Holds a strong ref to the FD, preventing the file from being closed /// while the operation is in-flight. fd: SharedFd, - /// Refers to https://docs.rs/io-uring/latest/io_uring/opcode/struct.Write.html. - /// - /// If `offset` is set to `-1`, the offset will use (and advance) the file position, like - /// the writev(2) system calls. - offset: u64, pub(crate) buf_vec: T, } impl Op> { pub(crate) fn writev(fd: SharedFd, buf_vec: T) -> io::Result { - Op::submit_with(WriteVec { - fd, - offset: -1i64 as u64, - buf_vec, - }) + Op::submit_with(WriteVec { fd, buf_vec }) } pub(crate) fn writev_raw(fd: &SharedFd, buf_vec: T) -> WriteVec { WriteVec { fd: fd.clone(), - offset: -1i64 as u64, buf_vec, } } - - pub(crate) async fn result(self) -> BufResult { - let complete = self.await; - (complete.meta.result.map(|v| v as _), complete.data.buf_vec) - } } impl OpAble for WriteVec { @@ -183,8 +163,12 @@ impl OpAble for WriteVec { fn uring_op(&mut self) -> io_uring::squeue::Entry { let ptr = self.buf_vec.read_iovec_ptr() as *const _; let len = self.buf_vec.read_iovec_len() as _; + // Refers to https://docs.rs/io-uring/latest/io_uring/opcode/struct.Write.html. + // + // If `offset` is set to `-1`, the offset will use (and advance) the file position, like + // the writev(2) system calls opcode::Writev::new(types::Fd(self.fd.raw_fd()), ptr, len) - .offset(self.offset) + .offset(-1i64 as u64) .build() } @@ -196,48 +180,178 @@ impl OpAble for WriteVec { .map(|idx| (Direction::Write, idx)) } - #[cfg(all(any(feature = "legacy", feature = "poll-io"), unix))] + #[cfg(any(feature = "legacy", feature = "poll-io"))] + fn legacy_call(&mut self) -> io::Result { + #[cfg(windows)] + let fd = self.fd.as_raw_handle() as _; + #[cfg(unix)] + let fd = self.fd.as_raw_fd(); + + let (buf_vec, len) = { + #[cfg(unix)] + { + (self.buf_vec.read_iovec_ptr(), self.buf_vec.read_iovec_len()) + } + #[cfg(windows)] + { + ( + self.buf_vec.read_wsabuf_ptr(), + self.buf_vec.read_wsabuf_len(), + ) + } + }; + + write_vectored(fd, buf_vec, len) + } +} + +#[cfg(not(windows))] +pub(crate) struct WriteVecAt { + fd: SharedFd, + /// Refers to https://docs.rs/io-uring/latest/io_uring/opcode/struct.Write.html. + /// + /// If `offset` is set to `-1`, the offset will use (and advance) the file position, like + /// the writev(2) system calls. + offset: u64, + buf_vec: T, +} + +#[cfg(not(windows))] +impl OpAble for WriteVecAt { + #[cfg(all(target_os = "linux", feature = "iouring"))] + fn uring_op(&mut self) -> io_uring::squeue::Entry { + opcode::Writev::new( + types::Fd(libc::AT_FDCWD), + self.buf_vec.read_iovec_ptr(), + self.buf_vec.read_iovec_len() as _, + ) + .offset(self.offset) + .build() + } + + #[cfg(any(feature = "legacy", feature = "poll-io"))] + fn legacy_interest(&self) -> Option<(crate::driver::ready::Direction, usize)> { + self.fd + .registered_index() + .map(|idx| (Direction::Write, idx)) + } + + #[cfg(any(feature = "legacy", feature = "poll-io"))] fn legacy_call(&mut self) -> io::Result { - use crate::syscall_u32; + write_vectored_at( + self.fd.raw_fd(), + self.buf_vec.read_iovec_ptr(), + self.buf_vec.read_iovec_len(), + self.offset, + ) + } +} + +#[cfg(all(any(feature = "legacy", feature = "poll-io"), unix))] +pub(crate) mod impls { + use libc::iovec; - let fd = self.fd.raw_fd(); - let mut seek_offset = -1; + use super::*; + use crate::syscall_u32; - if -1i64 as u64 != self.offset { - seek_offset = libc::off_t::try_from(self.offset) - .map_err(|_| io::Error::new(io::ErrorKind::Other, "offset too big"))?; + /// A wrapper of [`libc::write`] + pub(crate) fn write(fd: i32, buf: *const u8, len: usize) -> io::Result { + syscall_u32!(write(fd, buf as _, len)) + } + + /// A wrapper of [`libc::write`] + pub(crate) fn write_at(fd: i32, buf: *const u8, len: usize, offset: u64) -> io::Result { + let offset = libc::off_t::try_from(offset) + .map_err(|_| io::Error::new(io::ErrorKind::Other, "offset too big"))?; + + syscall_u32!(pwrite(fd, buf as _, len, offset)) + } + + /// A wrapper of [`libc::writev`] + pub(crate) fn write_vectored(fd: i32, buf_vec: *const iovec, len: usize) -> io::Result { + syscall_u32!(writev(fd, buf_vec as _, len as _)) + } + + /// A wrapper of [`libc::pwritev`] + pub(crate) fn write_vectored_at( + fd: i32, + buf_vec: *const iovec, + len: usize, + offset: u64, + ) -> io::Result { + let offset = libc::off_t::try_from(offset) + .map_err(|_| io::Error::new(io::ErrorKind::Other, "offset too big"))?; + + syscall_u32!(pwritev(fd, buf_vec as _, len as _, offset)) + } +} + +#[cfg(all(any(feature = "legacy", feature = "poll-io"), windows))] +pub(crate) mod impls { + use windows_sys::Win32::{ + Foundation::{GetLastError, ERROR_HANDLE_EOF}, + Networking::WinSock::WSABUF, + System::IO::OVERLAPPED, + }; + + use super::*; + + /// A wrapper of [`windows_sys::Win32::Storage::FileSystem::WriteFile`] + pub(crate) fn write(fd: isize, buf: *const u8, len: usize) -> io::Result { + let mut bytes_write = 0; + + let ret = unsafe { WriteFile(fd, buf, len as _, &mut bytes_write, std::ptr::null_mut()) }; + if ret == TRUE { + return Ok(bytes_write); } - if seek_offset == -1 { - syscall_u32!(writev( - fd, - self.buf_vec.read_iovec_ptr(), - self.buf_vec.read_iovec_len().min(i32::MAX as usize) as _ - )) - } else { - syscall_u32!(pwritev( - fd, - self.buf_vec.read_iovec_ptr(), - self.buf_vec.read_iovec_len().min(i32::MAX as usize) as _, - seek_offset - )) + match unsafe { GetLastError() } { + ERROR_HANDLE_EOF => Ok(bytes_write), + error => Err(io::Error::from_raw_os_error(error as _)), } } - #[cfg(all(any(feature = "legacy", feature = "poll-io"), windows))] - fn legacy_call(&mut self) -> io::Result { - // There is no `writev` like syscall of file on windows, but this will be used to send - // socket message. + /// A wrapper of [`windows_sys::Win32::Storage::FileSystem::WriteFile`], + /// using [`windows_sys::Win32::System::IO::OVERLAPPED`] to write at specific offset. + pub(crate) fn write_at(fd: isize, buf: *const u8, len: usize, offset: u64) -> io::Result { + let mut bytes_write = 0; + + let mut overlapped: OVERLAPPED = unsafe { std::mem::zeroed() }; + overlapped.Anonymous.Anonymous.Offset = offset as u32; // Lower 32 bits of the offset + overlapped.Anonymous.Anonymous.OffsetHigh = (offset >> 32) as u32; // Higher 32 bits of the offset + + let ret = unsafe { + WriteFile( + fd, + buf, + len as _, + &mut bytes_write, + &overlapped as *const _ as *mut _, + ) + }; + + if ret == TRUE { + return Ok(bytes_write); + } + + match unsafe { GetLastError() } { + ERROR_HANDLE_EOF => Ok(bytes_write), + error => Err(io::Error::from_raw_os_error(error as _)), + } + } + /// There is no `writev` like syscall of file on windows, but this will be used to send socket + /// message. + pub(crate) fn write_vectored(fd: usize, buf_vec: *const WSABUF, len: usize) -> io::Result { use windows_sys::Win32::Networking::WinSock::{WSAGetLastError, WSASend, WSAESHUTDOWN}; let mut bytes_sent = 0; let ret = unsafe { WSASend( - self.fd.raw_socket() as _, - self.buf_vec.read_wsabuf_ptr(), - self.buf_vec.read_wsabuf_len() as _, + fd, + buf_vec, + len as _, &mut bytes_sent, 0, std::ptr::null_mut(), diff --git a/monoio/src/fs/file/mod.rs b/monoio/src/fs/file/mod.rs index 4244376..0f5a29b 100644 --- a/monoio/src/fs/file/mod.rs +++ b/monoio/src/fs/file/mod.rs @@ -289,8 +289,7 @@ impl File { } async fn write(&mut self, buf: T) -> crate::BufResult { - let op = Op::write(self.fd.clone(), buf).unwrap(); - op.result().await + file_impl::write(self.fd.clone(), buf).await } async fn write_vectored(&mut self, buf_vec: T) -> crate::BufResult { @@ -356,8 +355,7 @@ impl File { /// /// [`Ok(n)`]: Ok pub async fn write_at(&self, buf: T, pos: u64) -> crate::BufResult { - let op = Op::write_at(&self.fd, buf, pos).unwrap(); - op.result().await + file_impl::write_at(self.fd.clone(), buf, pos).await } /// Attempts to write an entire buffer into this file at the specified diff --git a/monoio/src/fs/file/unix.rs b/monoio/src/fs/file/unix.rs index 88f5718..82c3753 100644 --- a/monoio/src/fs/file/unix.rs +++ b/monoio/src/fs/file/unix.rs @@ -1,115 +1,101 @@ -use std::{ - fs::File as StdFile, - io, - os::fd::{AsRawFd, IntoRawFd, RawFd}, -}; - -#[cfg(all(not(feature = "iouring"), feature = "sync"))] -pub(crate) use asyncified::*; -#[cfg(any(feature = "iouring", not(feature = "sync")))] -pub(crate) use iouring::*; - -use super::File; -use crate::{ - buf::{IoBufMut, IoVecBuf, IoVecBufMut}, - driver::{op::Op, shared_fd::SharedFd}, - fs::{metadata::FileAttr, Metadata}, -}; - -impl File { - /// Converts a [`std::fs::File`] to a [`monoio::fs::File`](File). - /// - /// # Examples - /// - /// ```no_run - /// // This line could block. It is not recommended to do this on the monoio - /// // runtime. - /// let std_file = std::fs::File::open("foo.txt").unwrap(); - /// let file = monoio::fs::File::from_std(std_file); - /// ``` - pub fn from_std(std: StdFile) -> std::io::Result { - Ok(File { - fd: SharedFd::new_without_register(std.into_raw_fd()), - }) - } - - /// Queries metadata about the underlying file. - /// - /// # Examples - /// - /// ```no_run - /// use monoio::fs::File; - /// - /// #[monoio::main] - /// async fn main() -> std::io::Result<()> { - /// let mut f = File::open("foo.txt").await?; - /// let metadata = f.metadata().await?; - /// Ok(()) - /// } - /// ``` - pub async fn metadata(&self) -> io::Result { - metadata(self.fd.clone()).await - } -} - -impl AsRawFd for File { - fn as_raw_fd(&self) -> RawFd { - self.fd.raw_fd() - } -} - -pub(crate) async fn write_vectored( - fd: SharedFd, - buf_vec: T, -) -> crate::BufResult { - let op = Op::writev(fd, buf_vec).unwrap(); - op.result().await -} - -pub(crate) async fn metadata(fd: SharedFd) -> std::io::Result { - #[cfg(target_os = "linux")] - let flags = libc::AT_STATX_SYNC_AS_STAT | libc::AT_EMPTY_PATH; - #[cfg(target_os = "linux")] - let op = Op::statx_using_fd(fd, flags)?; - #[cfg(target_os = "macos")] - let op = Op::statx_using_fd(fd, true)?; - - op.result().await.map(FileAttr::from).map(Metadata) -} - -#[cfg(any(feature = "iouring", not(feature = "sync")))] -mod iouring { - use super::*; - - pub(crate) async fn read(fd: SharedFd, buf: T) -> crate::BufResult { - let op = Op::read(fd, buf).unwrap(); - op.result().await - } - - pub(crate) async fn read_at( - fd: SharedFd, - buf: T, - pos: u64, - ) -> crate::BufResult { - let op = Op::read_at(fd, buf, pos).unwrap(); - op.result().await - } - - pub(crate) async fn read_vectored( - fd: SharedFd, - buf_vec: T, - ) -> crate::BufResult { - let op = Op::readv(fd, buf_vec).unwrap(); - op.result().await - } -} - -#[cfg(all(not(feature = "iouring"), feature = "sync"))] -mod asyncified { - use super::*; - use crate::{asyncify_op, driver::op::read}; - - asyncify_op!(read(read::read, IoBufMut::write_ptr, IoBufMut::bytes_total)); - asyncify_op!(read_at(read::read_at, IoBufMut::write_ptr, IoBufMut::bytes_total, pos: u64)); - asyncify_op!(read_vectored(read::read_vectored, IoVecBufMut::write_iovec_ptr, IoVecBufMut::write_iovec_len)); -} +use std::{ + fs::File as StdFile, + io, + os::fd::{AsRawFd, IntoRawFd, RawFd}, +}; + +#[cfg(all(not(feature = "iouring"), feature = "sync"))] +pub(crate) use asyncified::*; +#[cfg(any(feature = "iouring", not(feature = "sync")))] +pub(crate) use iouring::*; + +use super::File; +use crate::{ + buf::{IoBuf, IoBufMut, IoVecBuf, IoVecBufMut}, + driver::{op::Op, shared_fd::SharedFd}, + fs::{metadata::FileAttr, Metadata}, +}; + +impl File { + /// Converts a [`std::fs::File`] to a [`monoio::fs::File`](File). + /// + /// # Examples + /// + /// ```no_run + /// // This line could block. It is not recommended to do this on the monoio + /// // runtime. + /// let std_file = std::fs::File::open("foo.txt").unwrap(); + /// let file = monoio::fs::File::from_std(std_file); + /// ``` + pub fn from_std(std: StdFile) -> std::io::Result { + Ok(File { + fd: SharedFd::new_without_register(std.into_raw_fd()), + }) + } + + /// Queries metadata about the underlying file. + /// + /// # Examples + /// + /// ```no_run + /// use monoio::fs::File; + /// + /// #[monoio::main] + /// async fn main() -> std::io::Result<()> { + /// let mut f = File::open("foo.txt").await?; + /// let metadata = f.metadata().await?; + /// Ok(()) + /// } + /// ``` + pub async fn metadata(&self) -> io::Result { + metadata(self.fd.clone()).await + } +} + +impl AsRawFd for File { + fn as_raw_fd(&self) -> RawFd { + self.fd.raw_fd() + } +} + +pub(crate) async fn metadata(fd: SharedFd) -> std::io::Result { + #[cfg(target_os = "linux")] + let flags = libc::AT_STATX_SYNC_AS_STAT | libc::AT_EMPTY_PATH; + #[cfg(target_os = "linux")] + let op = Op::statx_using_fd(fd, flags)?; + #[cfg(target_os = "macos")] + let op = Op::statx_using_fd(fd, true)?; + + op.result().await.map(FileAttr::from).map(Metadata) +} + +#[cfg(any(feature = "iouring", not(feature = "sync")))] +mod iouring { + use super::*; + use crate::uring_op; + + uring_op!(read(read, buf)); + uring_op!(read_at(read_at, buf, pos: u64)); + uring_op!(read_vectored(readv, buf_vec)); + + uring_op!(write(write, buf)); + uring_op!(write_at(write_at, buf, pos: u64)); + uring_op!(write_vectored(writev, buf_vec)); +} + +#[cfg(all(not(feature = "iouring"), feature = "sync"))] +mod asyncified { + use super::*; + use crate::{ + asyncify_op, + driver::op::{read, write}, + }; + + asyncify_op!(R, read(read::read, IoBufMut::write_ptr, IoBufMut::bytes_total)); + asyncify_op!(R, read_at(read::read_at, IoBufMut::write_ptr, IoBufMut::bytes_total, pos: u64)); + asyncify_op!(R, read_vectored(read::read_vectored, IoVecBufMut::write_iovec_ptr, IoVecBufMut::write_iovec_len)); + + asyncify_op!(W, write(write::write, IoBuf::read_ptr, IoBuf::bytes_init)); + asyncify_op!(W, write_at(write::write_at, IoBuf::read_ptr, IoBuf::bytes_init, pos: u64)); + asyncify_op!(W, write_vectored(write::write_vectored, IoVecBuf::read_iovec_ptr, IoVecBuf::read_iovec_len)); +} diff --git a/monoio/src/fs/file/windows.rs b/monoio/src/fs/file/windows.rs index 2dc19a1..59e33ae 100644 --- a/monoio/src/fs/file/windows.rs +++ b/monoio/src/fs/file/windows.rs @@ -1,209 +1,250 @@ -use std::{ - mem::ManuallyDrop, - os::windows::io::{AsRawHandle, RawHandle}, -}; - -#[cfg(all(not(feature = "iouring"), feature = "sync"))] -pub(crate) use asyncified::*; -#[cfg(any(feature = "iouring", not(feature = "sync")))] -pub(crate) use blocking::*; -use windows_sys::Win32::Networking::WinSock::WSABUF; - -use super::File; -use crate::{ - buf::{IoBuf, IoBufMut, IoVecBuf, IoVecBufMut}, - driver::{op::Op, shared_fd::SharedFd}, -}; - -impl AsRawHandle for File { - fn as_raw_handle(&self) -> RawHandle { - self.fd.raw_handle() - } -} - -pub(crate) async fn write(fd: SharedFd, buf: T) -> crate::BufResult { - let op = Op::write(fd, buf).unwrap(); - op.result().await -} - -/// The `writev` implement on windows -/// -/// Due to windows does not have syscall like `writev`, so we need to simulate it by ourself. -/// -/// This function is just to write each buffer into file by calling the `write` function. -pub(crate) async fn write_vectored( - fd: SharedFd, - buf_vec: T, -) -> crate::BufResult { - // Convert the buffer vector into raw pointers that can be used in unsafe operations - let raw_bufs = buf_vec.read_wsabuf_ptr() as *mut WSABUF; - let len = buf_vec.read_wsabuf_len(); - - // Safely wrap the raw pointers into a Vec, but prevent automatic cleanup with ManuallyDrop - let wsabufs = ManuallyDrop::new(unsafe { Vec::from_raw_parts(raw_bufs, len, len) }); - let mut total_bytes_write = 0; - - // Iterate through each WSABUF structure and write data from it - for wsabuf in wsabufs.iter() { - // Safely create a Vec from the WSABUF pointer, then pass it to the write function - let (res, _) = write( - fd.clone(), - ManuallyDrop::new(unsafe { - Vec::from_raw_parts(wsabuf.buf, wsabuf.len as usize, wsabuf.len as usize) - }), - ) - .await; - - // Handle the result of the write operation - match res { - Ok(bytes_write) => { - total_bytes_write += bytes_write; - // If fewer bytes were written than requested, stop further writes - if bytes_write < wsabuf.len as usize { - break; - } - } - Err(e) => { - // If an error occurs, return it along with the original buffer vector - return (Err(e), buf_vec); - } - } - } - - // Return the total bytes written and the buffer vector - (Ok(total_bytes_write), buf_vec) -} - -#[cfg(any(feature = "iouring", not(feature = "sync")))] -mod blocking { - use super::*; - - pub(crate) async fn read(fd: SharedFd, buf: T) -> crate::BufResult { - let op = Op::read(fd, buf).unwrap(); - op.result().await - } - - pub(crate) async fn read_at( - fd: SharedFd, - buf: T, - pos: u64, - ) -> crate::BufResult { - let op = Op::read_at(fd, buf, pos).unwrap(); - op.result().await - } - - /// The `readv` implement on windows. - /// - /// Due to windows does not have syscall like `readv`, so we need to simulate it by ourself. - /// - /// This function is just to fill each buffer by calling the `read` function. - pub(crate) async fn read_vectored( - fd: SharedFd, - mut buf_vec: T, - ) -> crate::BufResult { - // Convert the mutable buffer vector into raw pointers that can be used in unsafe operations - let raw_bufs = buf_vec.write_wsabuf_ptr(); - let len = buf_vec.write_wsabuf_len(); - - // Safely wrap the raw pointers into a Vec, but prevent automatic cleanup with ManuallyDrop - let wasbufs = ManuallyDrop::new(unsafe { Vec::from_raw_parts(raw_bufs, len, len) }); - - let mut total_bytes_read = 0; - - // Iterate through each WSABUF structure and read data into it - for wsabuf in wasbufs.iter() { - // Safely create a Vec from the WSABUF pointer, then pass it to the read function - let (res, _) = read( - fd.clone(), - ManuallyDrop::new(unsafe { - Vec::from_raw_parts(wsabuf.buf, wsabuf.len as usize, wsabuf.len as usize) - }), - ) - .await; - - // Handle the result of the read operation - match res { - Ok(bytes_read) => { - total_bytes_read += bytes_read; - // If fewer bytes were read than requested, stop further reads - if bytes_read < wsabuf.len as usize { - break; - } - } - Err(e) => { - // If an error occurs, return it along with the original buffer vector - return (Err(e), buf_vec); - } - } - } - - // Due to `read` will init each buffer, so we do need to set buffer len here. - // Return the total bytes read and the buffer vector - (Ok(total_bytes_read), buf_vec) - } -} - -#[cfg(all(not(feature = "iouring"), feature = "sync"))] -mod asyncified { - use super::*; - use crate::{asyncify_op, driver::op::read, fs::asyncify}; - - asyncify_op!(read(read::read, IoBufMut::write_ptr, IoBufMut::bytes_total)); - asyncify_op!(read_at(read::read_at, IoBufMut::write_ptr, IoBufMut::bytes_total, pos: u64)); - - /// The `readv` implement on windows. - /// - /// Due to windows does not have syscall like `readv`, so we need to simulate it by ourself. - /// - /// This function is just to fill each buffer by calling the `read` function. - pub(crate) async fn read_vectored( - fd: SharedFd, - mut buf_vec: T, - ) -> crate::BufResult { - // Convert the mutable buffer vector into raw pointers that can be used in unsafe - // operations - let raw_bufs = buf_vec.write_wsabuf_ptr() as usize; - let len = buf_vec.write_wsabuf_len(); - let fd = fd.as_raw_handle() as _; - - let res = asyncify(move || { - // Safely wrap the raw pointers into a Vec, but prevent automatic cleanup with - // ManuallyDrop - let wasbufs = ManuallyDrop::new(unsafe { - Vec::from_raw_parts(raw_bufs as *mut WSABUF, len, len) - }); - - let mut total_bytes_read = 0; - - // Iterate through each WSABUF structure and read data into it - for wsabuf in wasbufs.iter() { - let res = read::read(fd, wsabuf.buf, wsabuf.len as usize); - - // Handle the result of the read operation - match res { - Ok(bytes_read) => { - total_bytes_read += bytes_read; - // If fewer bytes were read than requested, stop further reads - if bytes_read < wsabuf.len { - break; - } - } - Err(e) => { - // If an error occurs, return it along with the original buffer vector - return Err(e); - } - } - } - - // Due to `read` will init each buffer, so we do need to set buffer len here. - // Return the total bytes read and the buffer vector - Ok(total_bytes_read) - }) - .await - .map(|n| n as usize); - - unsafe { buf_vec.set_init(*res.as_ref().unwrap_or(&0)) }; - - (res, buf_vec) - } -} +use std::{ + mem::ManuallyDrop, + os::windows::io::{AsRawHandle, RawHandle}, +}; + +#[cfg(all(not(feature = "iouring"), feature = "sync"))] +pub(crate) use asyncified::*; +#[cfg(any(feature = "iouring", not(feature = "sync")))] +pub(crate) use blocking::*; +use windows_sys::Win32::Networking::WinSock::WSABUF; + +use super::File; +use crate::{ + buf::{IoBuf, IoBufMut, IoVecBuf, IoVecBufMut}, + driver::{op::Op, shared_fd::SharedFd}, +}; + +impl AsRawHandle for File { + fn as_raw_handle(&self) -> RawHandle { + self.fd.raw_handle() + } +} + +#[cfg(any(feature = "iouring", not(feature = "sync")))] +mod blocking { + use super::*; + use crate::uring_op; + + uring_op!(read(read, buf)); + uring_op!(read_at(read_at, buf, pos: u64)); + + uring_op!(write(write, buf)); + uring_op!(write_at(write_at, buf, pos: u64)); + + /// The `readv` implement on windows. + /// + /// Due to windows does not have syscall like `readv`, so we need to simulate it by ourself. + /// + /// This function is just to fill each buffer by calling the `read` function. + pub(crate) async fn read_vectored( + fd: SharedFd, + mut buf_vec: T, + ) -> crate::BufResult { + // Convert the mutable buffer vector into raw pointers that can be used in unsafe operations + let raw_bufs = buf_vec.write_wsabuf_ptr(); + let len = buf_vec.write_wsabuf_len(); + + // Safely wrap the raw pointers into a Vec, but prevent automatic cleanup with ManuallyDrop + let wasbufs = ManuallyDrop::new(unsafe { Vec::from_raw_parts(raw_bufs, len, len) }); + + let mut total_bytes_read = 0; + + // Iterate through each WSABUF structure and read data into it + for wsabuf in wasbufs.iter() { + // Safely create a Vec from the WSABUF pointer, then pass it to the read function + let (res, _) = read( + fd.clone(), + ManuallyDrop::new(unsafe { + Vec::from_raw_parts(wsabuf.buf, wsabuf.len as usize, wsabuf.len as usize) + }), + ) + .await; + + // Handle the result of the read operation + match res { + Ok(bytes_read) => { + total_bytes_read += bytes_read; + // If fewer bytes were read than requested, stop further reads + if bytes_read < wsabuf.len as usize { + break; + } + } + Err(e) => { + // If an error occurs, return it along with the original buffer vector + return (Err(e), buf_vec); + } + } + } + + // Due to `read` will init each buffer, so we do need to set buffer len here. + // Return the total bytes read and the buffer vector + (Ok(total_bytes_read), buf_vec) + } + + /// The `writev` implement on windows + /// + /// Due to windows does not have syscall like `writev`, so we need to simulate it by ourself. + /// + /// This function is just to write each buffer into file by calling the `write` function. + pub(crate) async fn write_vectored( + fd: SharedFd, + buf_vec: T, + ) -> crate::BufResult { + // Convert the buffer vector into raw pointers that can be used in unsafe operations + let raw_bufs = buf_vec.read_wsabuf_ptr() as *mut WSABUF; + let len = buf_vec.read_wsabuf_len(); + + // Safely wrap the raw pointers into a Vec, but prevent automatic cleanup with ManuallyDrop + let wsabufs = ManuallyDrop::new(unsafe { Vec::from_raw_parts(raw_bufs, len, len) }); + let mut total_bytes_write = 0; + + // Iterate through each WSABUF structure and write data from it + for wsabuf in wsabufs.iter() { + // Safely create a Vec from the WSABUF pointer, then pass it to the write function + let (res, _) = write( + fd.clone(), + ManuallyDrop::new(unsafe { + Vec::from_raw_parts(wsabuf.buf, wsabuf.len as usize, wsabuf.len as usize) + }), + ) + .await; + + // Handle the result of the write operation + match res { + Ok(bytes_write) => { + total_bytes_write += bytes_write; + // If fewer bytes were written than requested, stop further writes + if bytes_write < wsabuf.len as usize { + break; + } + } + Err(e) => { + // If an error occurs, return it along with the original buffer vector + return (Err(e), buf_vec); + } + } + } + + // Return the total bytes written and the buffer vector + (Ok(total_bytes_write), buf_vec) + } +} + +#[cfg(all(not(feature = "iouring"), feature = "sync"))] +mod asyncified { + use super::*; + use crate::{ + asyncify_op, + driver::op::{read, write}, + fs::asyncify, + }; + + asyncify_op!(R, read(read::read, IoBufMut::write_ptr, IoBufMut::bytes_total)); + asyncify_op!(R, read_at(read::read_at, IoBufMut::write_ptr, IoBufMut::bytes_total, pos: u64)); + + asyncify_op!(W, write(write::write, IoBuf::read_ptr, IoBuf::bytes_init)); + asyncify_op!(W, write_at(write::write_at, IoBuf::read_ptr, IoBuf::bytes_init, pos: u64)); + + /// The `readv` implement on windows. + /// + /// Due to windows does not have syscall like `readv`, so we need to simulate it by ourself. + /// + /// This function is just to fill each buffer by calling the `read` function. + pub(crate) async fn read_vectored( + fd: SharedFd, + mut buf_vec: T, + ) -> crate::BufResult { + // Convert the mutable buffer vector into raw pointers that can be used in unsafe + // operations + let raw_bufs = buf_vec.write_wsabuf_ptr() as usize; + let len = buf_vec.write_wsabuf_len(); + let fd = fd.as_raw_handle() as _; + + let res = asyncify(move || { + // Safely wrap the raw pointers into a Vec, but prevent automatic cleanup with + // ManuallyDrop + let wasbufs = ManuallyDrop::new(unsafe { + Vec::from_raw_parts(raw_bufs as *mut WSABUF, len, len) + }); + + let mut total_bytes_read = 0; + + // Iterate through each WSABUF structure and read data into it + for wsabuf in wasbufs.iter() { + let res = read::read(fd, wsabuf.buf, wsabuf.len as usize); + + // Handle the result of the read operation + match res { + Ok(bytes_read) => { + total_bytes_read += bytes_read; + // If fewer bytes were read than requested, stop further reads + if bytes_read < wsabuf.len { + break; + } + } + Err(e) => { + // If an error occurs, return it along with the original buffer vector + return Err(e); + } + } + } + + // Due to `read` will init each buffer, so we do need to set buffer len here. + // Return the total bytes read and the buffer vector + Ok(total_bytes_read) + }) + .await + .map(|n| n as usize); + + unsafe { buf_vec.set_init(*res.as_ref().unwrap_or(&0)) }; + + (res, buf_vec) + } + + /// The `writev` implement on windows + /// + /// Due to windows does not have syscall like `writev`, so we need to simulate it by ourself. + /// + /// This function is just to write each buffer into file by calling the `write` function. + pub(crate) async fn write_vectored( + fd: SharedFd, + buf_vec: T, + ) -> crate::BufResult { + // Convert the mutable buffer vector into raw pointers that can be used in unsafe + // operation + let raw_bufs = buf_vec.read_wsabuf_ptr() as usize; + let len = buf_vec.read_wsabuf_len(); + let fd = fd.as_raw_handle() as _; + + let res = asyncify(move || { + // Safely wrap the raw pointers into a Vec, but prevent automatic cleanup with + // ManuallyDrop + let wsabufs = ManuallyDrop::new(unsafe { + Vec::from_raw_parts(raw_bufs as *mut WSABUF, len, len) + }); + + let mut total_bytes_write = 0; + + for wsabuf in wsabufs.iter() { + let res = write::write(fd, wsabuf.buf, wsabuf.len as _); + + match res { + Ok(bytes_write) => { + total_bytes_write += bytes_write; + if bytes_write < wsabuf.len { + break; + } + } + Err(e) => return Err(e), + } + } + + Ok(total_bytes_write) + }) + .await + .map(|n| n as usize); + + (res, buf_vec) + } +} diff --git a/monoio/src/fs/mod.rs b/monoio/src/fs/mod.rs index 6776116..7d5fd62 100644 --- a/monoio/src/fs/mod.rs +++ b/monoio/src/fs/mod.rs @@ -86,6 +86,18 @@ where } } +/// A macro that generates the some Op-call functions. +#[cfg(any(feature = "iouring", not(feature = "sync")))] +#[macro_export] +macro_rules! uring_op { + ($fn_name:ident<$trait_name:ident>($op_name: ident, $buf_name:ident $(, $pos:ident: $pos_type:ty)?)) => { + pub(crate) async fn $fn_name(fd: SharedFd, $buf_name: T, $($pos: $pos_type)?) -> $crate::BufResult { + let op = Op::$op_name(fd, $buf_name, $($pos)?).unwrap(); + op.result().await + } + }; +} + /// A macro that generates an asynchronous I/O operation function, offloading a blocking /// system call to a separate thread using the `asyncify` function. /// @@ -95,7 +107,7 @@ where #[cfg(all(feature = "sync", not(feature = "iouring")))] #[macro_export] macro_rules! asyncify_op { - ($fn_name:ident<$Trait: ident>($read_op:expr, $buf_ptr_expr:expr, $len_expr:expr $(, $extra_param:ident : $typ: ty)?) $(,)?) => { + (R, $fn_name:ident<$Trait: ident>($op:expr, $buf_ptr_expr:expr, $len_expr:expr $(, $extra_param:ident : $typ: ty)?)) => { pub(crate) async fn $fn_name( fd: SharedFd, mut buf: T, @@ -111,12 +123,37 @@ macro_rules! asyncify_op { let buf_ptr = $buf_ptr_expr(&mut buf) as usize; let len = $len_expr(&mut buf); - let res = $crate::fs::asyncify(move || $read_op(fd, buf_ptr as *mut _, len, $($extra_param)?)) + let res = $crate::fs::asyncify(move || $op(fd, buf_ptr as *mut _, len, $($extra_param)?)) .await .map(|n| n as usize); unsafe { buf.set_init(*res.as_ref().unwrap_or(&0)) }; + (res, buf) + } + }; + (W, $fn_name:ident<$Trait: ident>($op:expr, $buf_ptr_expr:expr, $len_expr:expr $(, $extra_param:ident : $typ: ty)?)) => { + pub(crate) async fn $fn_name( + fd: SharedFd, + mut buf: T, + $($extra_param: $typ)? + ) -> $crate::BufResult { + #[cfg(unix)] + let fd = fd.as_raw_fd(); + #[cfg(windows)] + let fd = fd.as_raw_handle() as _; + // Safety: Due to the trait `IoBuf*/IoVecBuf*` require the implemet of `*_ptr` + // should return the same address, it should be safe to convert it to `usize` + // and then convert back. + let buf_ptr = $buf_ptr_expr(&mut buf) as usize; + let len = $len_expr(&mut buf); + + let res = $crate::fs::asyncify(move || $op(fd, buf_ptr as *mut _, len, $($extra_param)?)) + .await + .map(|n| n as usize); + + // unsafe { buf.set_init(*res.as_ref().unwrap_or(&0)) }; + (res, buf) } } diff --git a/monoio/tests/fs_file.rs b/monoio/tests/fs_file.rs index f3466d9..d9909f1 100644 --- a/monoio/tests/fs_file.rs +++ b/monoio/tests/fs_file.rs @@ -127,7 +127,7 @@ async fn basic_write() { } #[monoio::test_all] -async fn write_vectored() { +async fn basic_write_vectored() { let tempfile = tempfile(); let mut file = File::create(tempfile.path()).await.unwrap();