From 162cc7a1052da426226b02ac78bbdc77050c0533 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Mon, 26 Aug 2024 14:36:18 +0500 Subject: [PATCH] Various fixes --- ntex-compio/src/io.rs | 73 ++++++++++++++++++---------------- ntex-io/src/io.rs | 13 ++++++ ntex-io/src/tasks.rs | 7 +++- ntex/src/http/h1/dispatcher.rs | 6 ++- ntex/src/web/test.rs | 1 + ntex/tests/http_server.rs | 2 - 6 files changed, 63 insertions(+), 39 deletions(-) diff --git a/ntex-compio/src/io.rs b/ntex-compio/src/io.rs index 310760009..c97886085 100644 --- a/ntex-compio/src/io.rs +++ b/ntex-compio/src/io.rs @@ -171,33 +171,7 @@ async fn write_task(mut io: T, state: &WriteContext) { match result { WriteStatus::Ready => { // write io stream - let result = state - .with_buf_async(|buf| async { - let mut buf = CompioBuf(buf); - loop { - let BufResult(result, buf1) = io.write(buf).await; - buf = buf1; - - match result { - Ok(size) => { - if buf.0.len() == size { - return io.flush().await; - } - if size == 0 { - return Err(io::Error::new( - io::ErrorKind::WriteZero, - "failed to write frame to transport", - )); - } - buf.0.advance(size); - } - Err(e) => return Err(e), - } - } - }) - .await; - - match result { + match write(&mut io, state).await { Ok(()) => continue, Err(e) => { state.close(Some(e)); @@ -212,13 +186,15 @@ async fn write_task(mut io: T, state: &WriteContext) { WriteStatus::Shutdown(time) => { log::trace!("{}: Write task is instructed to shutdown", state.tag()); - if let Err(err) = io.flush().await { - state.close(Some(err)); - } else { - match select(sleep(time), io.shutdown()).await { - Either::Left(_) => state.close(None), - Either::Right(res) => state.close(res.err()), - } + let fut = async { + write(&mut io, state).await?; + io.flush().await?; + io.shutdown().await?; + Ok(()) + }; + match select(sleep(time), fut).await { + Either::Left(_) => state.close(None), + Either::Right(res) => state.close(res.err()), } } WriteStatus::Terminate => { @@ -229,3 +205,32 @@ async fn write_task(mut io: T, state: &WriteContext) { break; } } + +// write to io stream +async fn write(io: &mut T, state: &WriteContext) -> io::Result<()> { + state + .with_buf_async(|buf| async { + let mut buf = CompioBuf(buf); + loop { + let BufResult(result, buf1) = io.write(buf).await; + buf = buf1; + + match result { + Ok(size) => { + if buf.0.len() == size { + return io.flush().await; + } + if size == 0 { + return Err(io::Error::new( + io::ErrorKind::WriteZero, + "failed to write frame to transport", + )); + } + buf.0.advance(size); + } + Err(e) => return Err(e), + } + } + }) + .await +} diff --git a/ntex-io/src/io.rs b/ntex-io/src/io.rs index 15d5ce0ff..398ae0a80 100644 --- a/ntex-io/src/io.rs +++ b/ntex-io/src/io.rs @@ -398,6 +398,19 @@ impl Io { } } + #[doc(hidden)] + #[inline] + /// Resume read task + pub fn resume(&self) { + let st = self.st(); + let mut flags = st.flags.get(); + if flags.contains(Flags::RD_PAUSED) && !flags.contains(Flags::RD_BUF_FULL) { + flags.remove(Flags::RD_PAUSED | Flags::RD_READY); + st.flags.set(flags); + st.read_task.wake(); + } + } + #[inline] /// Encode item, send to the peer. Fully flush write buffer. pub async fn send( diff --git a/ntex-io/src/tasks.rs b/ntex-io/src/tasks.rs index 821aa9a9f..68e0be890 100644 --- a/ntex-io/src/tasks.rs +++ b/ntex-io/src/tasks.rs @@ -180,15 +180,18 @@ impl ReadContext { .and_then(|status| { if status.nbytes > 0 { // dest buffer has new data, wake up dispatcher - if inner.buffer.read_destination_size() >= hw { + let dest_len = inner.buffer.read_destination_size(); + if dest_len >= hw { log::trace!( "{}: Io read buffer is too large {}, enable read back-pressure", self.0.tag(), total ); inner.insert_flags(Flags::RD_READY | Flags::RD_BUF_FULL | Flags::RD_PAUSED); - } else if nbytes < hw { + } else if dest_len > 0 { inner.insert_flags(Flags::RD_READY | Flags::RD_PAUSED); + } else { + inner.read_task.wake(); } log::trace!( "{}: New {} bytes available, wakeup dispatcher", diff --git a/ntex/src/http/h1/dispatcher.rs b/ntex/src/http/h1/dispatcher.rs index 244853b87..c9d43e667 100644 --- a/ntex/src/http/h1/dispatcher.rs +++ b/ntex/src/http/h1/dispatcher.rs @@ -421,7 +421,11 @@ where } } else { // check for io changes, could close while waiting for service call - match ready!(self.io.poll_status_update(cx)) { + let result = self.io.poll_status_update(cx); + if result.is_pending() { + self.io.resume(); + } + match ready!(result) { IoStatusUpdate::KeepAlive => Poll::Pending, IoStatusUpdate::Stop | IoStatusUpdate::PeerGone(_) => { Poll::Ready(self.stop()) diff --git a/ntex/src/web/test.rs b/ntex/src/web/test.rs index ca64e0e5f..d561dff1e 100644 --- a/ntex/src/web/test.rs +++ b/ntex/src/web/test.rs @@ -693,6 +693,7 @@ where }, } .unwrap() + .set_tag("test", "WEB-SRV") .run(); tx.send((System::current(), srv, local_addr)).unwrap(); diff --git a/ntex/tests/http_server.rs b/ntex/tests/http_server.rs index a91f041c6..445125006 100644 --- a/ntex/tests/http_server.rs +++ b/ntex/tests/http_server.rs @@ -703,8 +703,6 @@ impl Drop for SetOnDrop { #[ntex::test] async fn test_h1_client_drop() -> io::Result<()> { - //let _ = env_logger::try_init(); - let count = Arc::new(AtomicUsize::new(0)); let count2 = count.clone();