From 8f9601d421ae0de530284dcab8682f3a2b5d0816 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Fri, 6 Sep 2024 15:43:15 +0500 Subject: [PATCH] Stop write task if io is closed --- ntex-io/Cargo.toml | 2 +- ntex-io/src/filter.rs | 35 ++++++++++++++++++----------------- ntex-io/src/ioref.rs | 4 ++++ ntex-io/src/tasks.rs | 11 +++++++++++ ntex-net/Cargo.toml | 4 ++-- ntex-tokio/CHANGES.md | 4 ++++ ntex-tokio/Cargo.toml | 6 +++--- ntex-tokio/src/io.rs | 14 ++++++++++++++ ntex/Cargo.toml | 2 +- ntex/src/http/builder.rs | 2 +- 10 files changed, 59 insertions(+), 25 deletions(-) diff --git a/ntex-io/Cargo.toml b/ntex-io/Cargo.toml index 1f47fe291..798cd3396 100644 --- a/ntex-io/Cargo.toml +++ b/ntex-io/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-io" -version = "2.3.1" +version = "2.4.0" authors = ["ntex contributors "] description = "Utilities for encoding and decoding frames" keywords = ["network", "framework", "async", "futures"] diff --git a/ntex-io/src/filter.rs b/ntex-io/src/filter.rs index 05f57b4fe..f74e057c2 100644 --- a/ntex-io/src/filter.rs +++ b/ntex-io/src/filter.rs @@ -97,25 +97,26 @@ impl Filter for Base { if flags.contains(Flags::IO_STOPPED) { Poll::Ready(WriteStatus::Terminate) - } else if flags.intersects(Flags::IO_STOPPING) { - Poll::Ready(WriteStatus::Shutdown( - self.0 .0.disconnect_timeout.get().into(), - )) - } else if flags.contains(Flags::IO_STOPPING_FILTERS) - && !flags.contains(Flags::IO_FILTERS_TIMEOUT) - { - flags.insert(Flags::IO_FILTERS_TIMEOUT); - self.0.set_flags(flags); - self.0 .0.write_task.register(cx.waker()); - Poll::Ready(WriteStatus::Timeout( - self.0 .0.disconnect_timeout.get().into(), - )) - } else if flags.intersects(Flags::WR_PAUSED) { - self.0 .0.write_task.register(cx.waker()); - Poll::Pending } else { self.0 .0.write_task.register(cx.waker()); - Poll::Ready(WriteStatus::Ready) + + if flags.intersects(Flags::IO_STOPPING) { + Poll::Ready(WriteStatus::Shutdown( + self.0 .0.disconnect_timeout.get().into(), + )) + } else if flags.contains(Flags::IO_STOPPING_FILTERS) + && !flags.contains(Flags::IO_FILTERS_TIMEOUT) + { + flags.insert(Flags::IO_FILTERS_TIMEOUT); + self.0.set_flags(flags); + Poll::Ready(WriteStatus::Timeout( + self.0 .0.disconnect_timeout.get().into(), + )) + } else if flags.intersects(Flags::WR_PAUSED) { + Poll::Pending + } else { + Poll::Ready(WriteStatus::Ready) + } } } diff --git a/ntex-io/src/ioref.rs b/ntex-io/src/ioref.rs index 02dc33766..340c03f54 100644 --- a/ntex-io/src/ioref.rs +++ b/ntex-io/src/ioref.rs @@ -41,6 +41,10 @@ impl IoRef { .intersects(Flags::IO_STOPPING | Flags::IO_STOPPED) } + pub(crate) fn is_io_closed(&self) -> bool { + self.0.flags.get().intersects(Flags::IO_STOPPED) + } + #[inline] /// Check if write back-pressure is enabled pub fn is_wr_backpressure(&self) -> bool { diff --git a/ntex-io/src/tasks.rs b/ntex-io/src/tasks.rs index e8e253328..e8a14c2eb 100644 --- a/ntex-io/src/tasks.rs +++ b/ntex-io/src/tasks.rs @@ -296,6 +296,17 @@ impl WriteContext { self.0.filter().poll_write_ready(cx) } + #[inline] + /// Check if io is closed + pub fn poll_close(&self, cx: &mut Context<'_>) -> Poll<()> { + if self.0.is_io_closed() { + Poll::Ready(()) + } else { + self.0 .0.write_task.register(cx.waker()); + Poll::Pending + } + } + /// Get write buffer pub fn with_buf(&self, f: F) -> Poll> where diff --git a/ntex-net/Cargo.toml b/ntex-net/Cargo.toml index 33d99009c..c53b555eb 100644 --- a/ntex-net/Cargo.toml +++ b/ntex-net/Cargo.toml @@ -34,11 +34,11 @@ async-std = ["ntex-rt/async-std", "ntex-async-std"] ntex-service = "3" ntex-bytes = "0.1" ntex-http = "0.1" -ntex-io = "2.3" +ntex-io = "2.4" ntex-rt = "0.4.14" ntex-util = "2" -ntex-tokio = { version = "0.5", optional = true } +ntex-tokio = { version = "0.5.1", optional = true } ntex-compio = { version = "0.1", optional = true } ntex-glommio = { version = "0.5", optional = true } ntex-async-std = { version = "0.5", optional = true } diff --git a/ntex-tokio/CHANGES.md b/ntex-tokio/CHANGES.md index 397bb2b6c..ceaa64a87 100644 --- a/ntex-tokio/CHANGES.md +++ b/ntex-tokio/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.5.1] - 2024-09-06 + +* Stop write task if io is closed + ## [0.4.0] - 2024-01-09 * Log io tags diff --git a/ntex-tokio/Cargo.toml b/ntex-tokio/Cargo.toml index 9bffa23d5..b741cff97 100644 --- a/ntex-tokio/Cargo.toml +++ b/ntex-tokio/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-tokio" -version = "0.5.0" +version = "0.5.1" authors = ["ntex contributors "] description = "tokio intergration for ntex framework" keywords = ["network", "framework", "async", "futures"] @@ -17,7 +17,7 @@ path = "src/lib.rs" [dependencies] ntex-bytes = "0.1" -ntex-io = "2.0" -ntex-util = "2.0" +ntex-io = "2.4" +ntex-util = "2" log = "0.4" tokio = { version = "1", default-features = false, features = ["rt", "net", "sync", "signal"] } diff --git a/ntex-tokio/src/io.rs b/ntex-tokio/src/io.rs index 76a7ea0f7..06f1656ad 100644 --- a/ntex-tokio/src/io.rs +++ b/ntex-tokio/src/io.rs @@ -137,6 +137,10 @@ impl Future for WriteTask { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.as_mut().get_mut(); + if this.state.poll_close(cx).is_ready() { + return Poll::Ready(()); + } + match this.st { IoWriteState::Processing(ref mut delay) => { match ready!(this.state.poll_ready(cx)) { @@ -215,6 +219,9 @@ impl Future for WriteTask { // close WRITE side and wait for disconnect on read side. // use disconnect timeout, otherwise it could hang forever. loop { + if this.state.poll_close(cx).is_ready() { + return Poll::Ready(()); + } match st { Shutdown::None => { // flush write buffer @@ -564,6 +571,10 @@ mod unixstream { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.as_mut().get_mut(); + if this.state.poll_close(cx).is_ready() { + return Poll::Ready(()); + } + match this.st { IoWriteState::Processing(ref mut delay) => { match this.state.poll_ready(cx) { @@ -630,6 +641,9 @@ mod unixstream { // close WRITE side and wait for disconnect on read side. // use disconnect timeout, otherwise it could hang forever. loop { + if this.state.poll_close(cx).is_ready() { + return Poll::Ready(()); + } match st { Shutdown::None => { // flush write buffer diff --git a/ntex/Cargo.toml b/ntex/Cargo.toml index de7bb0165..46e88c430 100644 --- a/ntex/Cargo.toml +++ b/ntex/Cargo.toml @@ -71,7 +71,7 @@ ntex-bytes = "0.1.27" ntex-server = "2.3" ntex-h2 = "1.1" ntex-rt = "0.4.15" -ntex-io = "2.3" +ntex-io = "2.4" ntex-net = "2.1" ntex-tls = "2.1" diff --git a/ntex/src/http/builder.rs b/ntex/src/http/builder.rs index b2613c30e..e27b13faf 100644 --- a/ntex/src/http/builder.rs +++ b/ntex/src/http/builder.rs @@ -85,7 +85,7 @@ where /// /// To disable timeout set value to 0. /// - /// By default disconnect timeout is set to 3 seconds. + /// By default disconnect timeout is set to 1 seconds. pub fn disconnect_timeout(mut self, timeout: Seconds) -> Self { self.config.disconnect_timeout(timeout); self