Skip to content

Commit

Permalink
Refactor filter shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed Sep 10, 2024
1 parent a940756 commit 568df1c
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 110 deletions.
29 changes: 6 additions & 23 deletions ntex-io/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,26 +93,16 @@ impl Filter for Base {

#[inline]
fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<WriteStatus> {
let mut flags = self.0.flags();
let flags = self.0.flags();

if flags.is_stopped() {
Poll::Ready(WriteStatus::Terminate)
} else {
self.0 .0.write_task.register(cx.waker());

if flags.intersects(Flags::IO_STOPPING) {
Poll::Ready(WriteStatus::Shutdown(
self.0 .0.disconnect_timeout.get().into(),
))
} else if flags.contains(Flags::IO_STOPPING_FILTERS)
&& !flags.contains(Flags::IO_FILTERS_TIMEOUT)
{
flags.insert(Flags::IO_FILTERS_TIMEOUT);
self.0.set_flags(flags);
Poll::Ready(WriteStatus::Timeout(
self.0 .0.disconnect_timeout.get().into(),
))
} else if flags.intersects(Flags::WR_PAUSED) {
if flags.contains(Flags::IO_STOPPING) {
Poll::Ready(WriteStatus::Shutdown)
} else if flags.contains(Flags::WR_PAUSED) {
Poll::Pending
} else {
Poll::Ready(WriteStatus::Ready)
Expand Down Expand Up @@ -242,20 +232,13 @@ where
Poll::Pending => Poll::Pending,
Poll::Ready(WriteStatus::Ready) => res2,
Poll::Ready(WriteStatus::Terminate) => Poll::Ready(WriteStatus::Terminate),
Poll::Ready(WriteStatus::Shutdown(t)) => {
Poll::Ready(WriteStatus::Shutdown) => {
if res2 == Poll::Ready(WriteStatus::Terminate) {
Poll::Ready(WriteStatus::Terminate)
} else {
Poll::Ready(WriteStatus::Shutdown(t))
Poll::Ready(WriteStatus::Shutdown)

Check warning on line 239 in ntex-io/src/filter.rs

View check run for this annotation

Codecov / codecov/patch

ntex-io/src/filter.rs#L239

Added line #L239 was not covered by tests
}
}
Poll::Ready(WriteStatus::Timeout(t)) => match res2 {
Poll::Ready(WriteStatus::Terminate) => Poll::Ready(WriteStatus::Terminate),
Poll::Ready(WriteStatus::Shutdown(t)) => {
Poll::Ready(WriteStatus::Shutdown(t))
}
_ => Poll::Ready(WriteStatus::Timeout(t)),
},
}
}
}
Expand Down
2 changes: 0 additions & 2 deletions ntex-io/src/flags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ bitflags::bitflags! {
const IO_STOPPING = 0b0000_0000_0000_0010;
/// shuting down filters
const IO_STOPPING_FILTERS = 0b0000_0000_0000_0100;
/// initiate filters shutdown timeout in write task
const IO_FILTERS_TIMEOUT = 0b0000_0000_0000_1000;

/// pause io read
const RD_PAUSED = 0b0000_0000_0001_0000;
Expand Down
6 changes: 0 additions & 6 deletions ntex-io/src/ioref.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,6 @@ impl IoRef {
self.0.flags.get()
}

#[inline]
/// Set flags
pub(crate) fn set_flags(&self, flags: Flags) {
self.0.flags.set(flags)
}

#[inline]
/// Get current filter
pub(crate) fn filter(&self) -> &dyn Filter {
Expand Down
7 changes: 2 additions & 5 deletions ntex-io/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ mod utils;

use ntex_bytes::BytesVec;
use ntex_codec::{Decoder, Encoder};
use ntex_util::time::Millis;

pub use self::buf::{ReadBuf, WriteBuf};
pub use self::dispatcher::{Dispatcher, DispatcherConfig};
Expand Down Expand Up @@ -64,10 +63,8 @@ pub enum ReadStatus {
pub enum WriteStatus {
/// Write task is clear to proceed with write operation
Ready,
/// Initiate timeout for normal write operations, shutdown connection after timeout
Timeout(Millis),
/// Initiate graceful io shutdown operation with timeout
Shutdown(Millis),
/// Initiate graceful io shutdown operation
Shutdown,
/// Immediately terminate connection
Terminate,
}
Expand Down
131 changes: 57 additions & 74 deletions ntex-io/src/tasks.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
use std::{future::poll_fn, io, task::Poll};
use std::{cell::Cell, fmt, future::poll_fn, io, task::Context, task::Poll};

use ntex_bytes::{BufMut, BytesVec};
use ntex_util::{future::select, future::Either, time::sleep};
use ntex_util::{future::lazy, future::select, future::Either, time::sleep, time::Sleep};

use crate::{AsyncRead, AsyncWrite, Flags, IoRef, ReadStatus, WriteStatus};

#[derive(Debug)]
/// Context for io read task
pub struct ReadContext(IoRef);
pub struct ReadContext(IoRef, Cell<Option<Sleep>>);

impl fmt::Debug for ReadContext {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ReadContext").field("io", &self.0).finish()
}

Check warning on line 14 in ntex-io/src/tasks.rs

View check run for this annotation

Codecov / codecov/patch

ntex-io/src/tasks.rs#L12-L14

Added lines #L12 - L14 were not covered by tests
}

impl ReadContext {
pub(crate) fn new(io: &IoRef) -> Self {
Self(io.clone())
Self(io.clone(), Cell::new(None))
}

#[inline]
Expand All @@ -30,7 +35,7 @@ impl ReadContext {
} else {
self.0 .0.read_task.register(cx.waker());
if flags.contains(Flags::IO_STOPPING_FILTERS) {
shutdown_filters(&self.0);
self.shutdown_filters(cx);
}
Poll::Pending
}
Expand Down Expand Up @@ -149,7 +154,7 @@ impl ReadContext {
}
Ok(_) => {
if inner.flags.get().contains(Flags::IO_STOPPING_FILTERS) {
shutdown_filters(&self.0);
lazy(|cx| self.shutdown_filters(cx)).await;
}
}
Err(err) => {
Expand All @@ -160,6 +165,48 @@ impl ReadContext {
}
}
}

fn shutdown_filters(&self, cx: &mut Context<'_>) {
let st = &self.0 .0;
let filter = self.0.filter();

match filter.shutdown(&self.0, &st.buffer, 0) {
Ok(Poll::Ready(())) => {
st.dispatch_task.wake();
st.insert_flags(Flags::IO_STOPPING);
}
Ok(Poll::Pending) => {
let flags = st.flags.get();

// check read buffer, if buffer is not consumed it is unlikely
// that filter will properly complete shutdown
if flags.contains(Flags::RD_PAUSED)
|| flags.contains(Flags::BUF_R_FULL | Flags::BUF_R_READY)
{
st.dispatch_task.wake();
st.insert_flags(Flags::IO_STOPPING);

Check warning on line 187 in ntex-io/src/tasks.rs

View check run for this annotation

Codecov / codecov/patch

ntex-io/src/tasks.rs#L185-L187

Added lines #L185 - L187 were not covered by tests
} else {
// filter shutdown timeout
let timeout = self
.1
.take()
.unwrap_or_else(|| sleep(st.disconnect_timeout.get()));
if timeout.poll_elapsed(cx).is_ready() {
st.dispatch_task.wake();
st.insert_flags(Flags::IO_STOPPING);

Check warning on line 196 in ntex-io/src/tasks.rs

View check run for this annotation

Codecov / codecov/patch

ntex-io/src/tasks.rs#L195-L196

Added lines #L195 - L196 were not covered by tests
} else {
self.1.set(Some(timeout));
}
}
}
Err(err) => {
st.io_stopped(Some(err));
}
}
if let Err(err) = filter.process_write_buf(&self.0, &st.buffer, 0) {
st.io_stopped(Some(err));

Check warning on line 207 in ntex-io/src/tasks.rs

View check run for this annotation

Codecov / codecov/patch

ntex-io/src/tasks.rs#L207

Added line #L207 was not covered by tests
}
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -212,41 +259,13 @@ impl WriteContext {
where
T: AsyncWrite,
{
let inner = &self.0 .0;
let mut delay = None;
let mut buf = WriteContextBuf {
io: self.0.clone(),
buf: None,
};

loop {
// check readiness
let result = if let Some(ref mut sleep) = delay {
let result = match select(sleep, self.ready()).await {
Either::Left(_) => {
self.close(Some(io::Error::new(
io::ErrorKind::TimedOut,
"Operation timedout",
)));
return;
}
Either::Right(res) => res,
};
delay = None;
result
} else {
self.ready().await
};

// running
let mut flags = inner.flags.get();
if flags.contains(Flags::WR_PAUSED) {
flags.remove(Flags::WR_PAUSED);
inner.flags.set(flags);
}

// handle write
match result {
match self.ready().await {
WriteStatus::Ready => {
// write io stream
match select(io.write(&mut buf), self.when_stopped()).await {
Expand All @@ -255,12 +274,7 @@ impl WriteContext {
Either::Right(_) => return,
}
}
WriteStatus::Timeout(time) => {
log::trace!("{}: Initiate timeout delay for {:?}", self.tag(), time);
delay = Some(sleep(time));
continue;
}
WriteStatus::Shutdown(time) => {
WriteStatus::Shutdown => {
log::trace!("{}: Write task is instructed to shutdown", self.tag());

let fut = async {
Expand All @@ -270,7 +284,7 @@ impl WriteContext {
io.shutdown().await?;
Ok(())
};
match select(sleep(time), fut).await {
match select(sleep(self.0 .0.disconnect_timeout.get()), fut).await {
Either::Left(_) => self.close(None),

Check warning on line 288 in ntex-io/src/tasks.rs

View check run for this annotation

Codecov / codecov/patch

ntex-io/src/tasks.rs#L288

Added line #L288 was not covered by tests
Either::Right(res) => self.close(res.err()),
}
Expand Down Expand Up @@ -328,34 +342,3 @@ impl WriteContextBuf {
}
}
}

fn shutdown_filters(io: &IoRef) {
let st = &io.0;
let flags = st.flags.get();

if !flags.intersects(Flags::IO_STOPPED | Flags::IO_STOPPING) {
let filter = io.filter();
match filter.shutdown(io, &st.buffer, 0) {
Ok(Poll::Ready(())) => {
st.dispatch_task.wake();
st.insert_flags(Flags::IO_STOPPING);
}
Ok(Poll::Pending) => {
// check read buffer, if buffer is not consumed it is unlikely
// that filter will properly complete shutdown
if flags.contains(Flags::RD_PAUSED)
|| flags.contains(Flags::BUF_R_FULL | Flags::BUF_R_READY)
{
st.dispatch_task.wake();
st.insert_flags(Flags::IO_STOPPING);
}
}
Err(err) => {
st.io_stopped(Some(err));
}
}
if let Err(err) = filter.process_write_buf(io, &st.buffer, 0) {
st.io_stopped(Some(err));
}
}
}

0 comments on commit 568df1c

Please sign in to comment.