diff --git a/ntex-server/CHANGES.md b/ntex-server/CHANGES.md index 33ca163e..c01352e4 100644 --- a/ntex-server/CHANGES.md +++ b/ntex-server/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [2.5.0] - 2024-11-04 + +* Use updated Service trait + ## [2.3.0] - 2024-07-16 * Add Server to TestServer diff --git a/ntex-server/Cargo.toml b/ntex-server/Cargo.toml index 2ed93e58..a21943b2 100644 --- a/ntex-server/Cargo.toml +++ b/ntex-server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-server" -version = "2.4.0" +version = "2.5.0" authors = ["ntex contributors "] description = "Server for ntex framework" keywords = ["network", "framework", "async", "futures"] @@ -18,7 +18,7 @@ path = "src/lib.rs" [dependencies] ntex-bytes = "0.1" ntex-net = "2" -ntex-service = "3" +ntex-service = "3.3" ntex-rt = "0.4" ntex-util = "2" diff --git a/ntex-server/src/net/counter.rs b/ntex-server/src/net/counter.rs index cc6d14b8..87dbe2af 100644 --- a/ntex-server/src/net/counter.rs +++ b/ntex-server/src/net/counter.rs @@ -1,4 +1,4 @@ -use std::{cell::Cell, future::poll_fn, rc::Rc, task, task::Poll}; +use std::{cell::Cell, future::poll_fn, rc::Rc, task}; use ntex_util::task::LocalWaker; @@ -30,14 +30,29 @@ impl Counter { CounterGuard::new(self.0.clone()) } + pub(crate) fn is_available(&self) -> bool { + self.0.count.get() < self.0.capacity + } + /// Check if counter is not at capacity. If counter at capacity /// it registers notification for current task. - pub(super) async fn available(&self) { + pub(crate) async fn available(&self) { + poll_fn(|cx| { + if self.0.available(cx) { + task::Poll::Ready(()) + } else { + task::Poll::Pending + } + }) + .await + } + + pub(crate) async fn unavailable(&self) { poll_fn(|cx| { if self.0.available(cx) { - Poll::Ready(()) + task::Poll::Pending } else { - Poll::Pending + task::Poll::Ready(()) } }) .await @@ -72,7 +87,11 @@ impl Drop for CounterGuard { impl CounterInner { fn inc(&self) { - self.count.set(self.count.get() + 1); + let num = self.count.get() + 1; + self.count.set(num); + if num == self.capacity { + self.task.wake(); + } } fn dec(&self) { @@ -84,11 +103,7 @@ impl CounterInner { } fn available(&self, cx: &mut task::Context<'_>) -> bool { - if self.count.get() < self.capacity { - true - } else { - self.task.register(cx.waker()); - false - } + self.task.register(cx.waker()); + self.count.get() < self.capacity } } diff --git a/ntex-server/src/net/factory.rs b/ntex-server/src/net/factory.rs index 92fd5f95..6165f355 100644 --- a/ntex-server/src/net/factory.rs +++ b/ntex-server/src/net/factory.rs @@ -148,6 +148,7 @@ where type Response = (); type Error = (); + ntex_service::forward_notready!(inner); ntex_service::forward_shutdown!(inner); async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), ()> { diff --git a/ntex-server/src/net/service.rs b/ntex-server/src/net/service.rs index a3cef6ea..b67df07d 100644 --- a/ntex-server/src/net/service.rs +++ b/ntex-server/src/net/service.rs @@ -1,4 +1,4 @@ -use std::fmt; +use std::{fmt, future::poll_fn, future::Future, pin::Pin, task::Poll}; use ntex_bytes::{Pool, PoolRef}; use ntex_net::Io; @@ -152,25 +152,48 @@ impl Service for StreamServiceImpl { type Error = (); async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> { - self.conns.available().await; + if !self.conns.is_available() { + self.conns.available().await; + } for (idx, svc) in self.services.iter().enumerate() { - match ctx.ready(svc).await { - Ok(()) => (), - Err(_) => { - for (idx_, tag, _, _) in self.tokens.values() { - if idx == *idx_ { - log::error!("{}: Service readiness has failed", tag); - break; - } + if ctx.ready(svc).await.is_err() { + for (idx_, tag, _, _) in self.tokens.values() { + if idx == *idx_ { + log::error!("{}: Service readiness has failed", tag); + break; } - return Err(()); } + return Err(()); } } Ok(()) } + #[inline] + async fn not_ready(&self) { + if self.conns.is_available() { + let mut futs: Vec<_> = self + .services + .iter() + .map(|s| Box::pin(s.not_ready())) + .collect(); + + ntex_util::future::select( + self.conns.unavailable(), + poll_fn(move |cx| { + for f in &mut futs { + if Pin::new(f).poll(cx).is_ready() { + return Poll::Ready(()); + } + } + Poll::Pending + }), + ) + .await; + } + } + async fn shutdown(&self) { let _ = join_all(self.services.iter().map(|svc| svc.shutdown())).await; log::info!( diff --git a/ntex/Cargo.toml b/ntex/Cargo.toml index cede1379..81484097 100644 --- a/ntex/Cargo.toml +++ b/ntex/Cargo.toml @@ -64,14 +64,14 @@ brotli = ["dep:brotli2"] ntex-codec = "0.6.2" ntex-http = "0.1.12" ntex-router = "0.5.3" -ntex-service = "3.1" +ntex-service = "3.3" ntex-macros = "0.1.3" -ntex-util = "2" +ntex-util = "2.5" ntex-bytes = "0.1.27" -ntex-server = "2.4" +ntex-server = "2.5" ntex-h2 = "1.2" ntex-rt = "0.4.19" -ntex-io = "2.7" +ntex-io = "2.8" ntex-net = "2.4" ntex-tls = "2.1"