Skip to content

Commit

Permalink
various fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed Aug 26, 2024
1 parent 24a17dc commit 2c56024
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 39 deletions.
73 changes: 39 additions & 34 deletions ntex-compio/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,33 +171,7 @@ async fn write_task<T: AsyncWrite>(mut io: T, state: &WriteContext) {
match result {

Check warning on line 171 in ntex-compio/src/io.rs

View check run for this annotation

Codecov / codecov/patch

ntex-compio/src/io.rs#L171

Added line #L171 was not covered by tests
WriteStatus::Ready => {
// write io stream
let result = state
.with_buf_async(|buf| async {
let mut buf = CompioBuf(buf);
loop {
let BufResult(result, buf1) = io.write(buf).await;
buf = buf1;

match result {
Ok(size) => {
if buf.0.len() == size {
return io.flush().await;
}
if size == 0 {
return Err(io::Error::new(
io::ErrorKind::WriteZero,
"failed to write frame to transport",
));
}
buf.0.advance(size);
}
Err(e) => return Err(e),
}
}
})
.await;

match result {
match write(&mut io, state).await {
Ok(()) => continue,
Err(e) => {
state.close(Some(e));
Expand All @@ -212,13 +186,15 @@ async fn write_task<T: AsyncWrite>(mut io: T, state: &WriteContext) {
WriteStatus::Shutdown(time) => {
log::trace!("{}: Write task is instructed to shutdown", state.tag());

Check warning on line 187 in ntex-compio/src/io.rs

View check run for this annotation

Codecov / codecov/patch

ntex-compio/src/io.rs#L186-L187

Added lines #L186 - L187 were not covered by tests

if let Err(err) = io.flush().await {
state.close(Some(err));
} else {
match select(sleep(time), io.shutdown()).await {
Either::Left(_) => state.close(None),
Either::Right(res) => state.close(res.err()),
}
let fut = async {
write(&mut io, state).await?;
io.flush().await?;
io.shutdown().await?;
Ok(())
};
match select(sleep(time), fut).await {
Either::Left(_) => state.close(None),
Either::Right(res) => state.close(res.err()),

Check warning on line 197 in ntex-compio/src/io.rs

View check run for this annotation

Codecov / codecov/patch

ntex-compio/src/io.rs#L189-L197

Added lines #L189 - L197 were not covered by tests
}
}
WriteStatus::Terminate => {
Expand All @@ -229,3 +205,32 @@ async fn write_task<T: AsyncWrite>(mut io: T, state: &WriteContext) {
break;

Check warning on line 205 in ntex-compio/src/io.rs

View check run for this annotation

Codecov / codecov/patch

ntex-compio/src/io.rs#L205

Added line #L205 was not covered by tests
}
}

Check warning on line 207 in ntex-compio/src/io.rs

View check run for this annotation

Codecov / codecov/patch

ntex-compio/src/io.rs#L207

Added line #L207 was not covered by tests

// write to io stream
async fn write<T: AsyncWrite>(io: &mut T, state: &WriteContext) -> io::Result<()> {
state
.with_buf_async(|buf| async {
let mut buf = CompioBuf(buf);

Check warning on line 213 in ntex-compio/src/io.rs

View check run for this annotation

Codecov / codecov/patch

ntex-compio/src/io.rs#L210-L213

Added lines #L210 - L213 were not covered by tests
loop {
let BufResult(result, buf1) = io.write(buf).await;
buf = buf1;

match result {
Ok(size) => {
if buf.0.len() == size {
return io.flush().await;
}
if size == 0 {
return Err(io::Error::new(
io::ErrorKind::WriteZero,
"failed to write frame to transport",
));
}
buf.0.advance(size);

Check warning on line 229 in ntex-compio/src/io.rs

View check run for this annotation

Codecov / codecov/patch

ntex-compio/src/io.rs#L215-L229

Added lines #L215 - L229 were not covered by tests
}
Err(e) => return Err(e),

Check warning on line 231 in ntex-compio/src/io.rs

View check run for this annotation

Codecov / codecov/patch

ntex-compio/src/io.rs#L231

Added line #L231 was not covered by tests
}
}
})
.await
}

Check warning on line 236 in ntex-compio/src/io.rs

View check run for this annotation

Codecov / codecov/patch

ntex-compio/src/io.rs#L234-L236

Added lines #L234 - L236 were not covered by tests
14 changes: 14 additions & 0 deletions ntex-io/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,20 @@ impl<F> Io<F> {
}
}

#[doc(hidden)]
#[inline]
/// Resume read task
pub fn resume(&self) {
let st = self.st();
let mut flags = st.flags.get();
if flags.contains(Flags::RD_PAUSED) && !flags.contains(Flags::RD_BUF_FULL) {
flags.remove(Flags::RD_PAUSED);
flags.remove(Flags::RD_READY);
st.flags.set(flags);
st.read_task.wake();

Check warning on line 411 in ntex-io/src/io.rs

View check run for this annotation

Codecov / codecov/patch

ntex-io/src/io.rs#L408-L411

Added lines #L408 - L411 were not covered by tests
}
}

#[inline]
/// Encode item, send to the peer. Fully flush write buffer.
pub async fn send<U>(
Expand Down
7 changes: 5 additions & 2 deletions ntex-io/src/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,15 +180,18 @@ impl ReadContext {
.and_then(|status| {
if status.nbytes > 0 {

Check warning on line 181 in ntex-io/src/tasks.rs

View check run for this annotation

Codecov / codecov/patch

ntex-io/src/tasks.rs#L169-L181

Added lines #L169 - L181 were not covered by tests
// dest buffer has new data, wake up dispatcher
if inner.buffer.read_destination_size() >= hw {
let dest_len = inner.buffer.read_destination_size();
if dest_len >= hw {
log::trace!(
"{}: Io read buffer is too large {}, enable read back-pressure",
self.0.tag(),

Check warning on line 187 in ntex-io/src/tasks.rs

View check run for this annotation

Codecov / codecov/patch

ntex-io/src/tasks.rs#L183-L187

Added lines #L183 - L187 were not covered by tests
total
);
inner.insert_flags(Flags::RD_READY | Flags::RD_BUF_FULL | Flags::RD_PAUSED);
} else if nbytes < hw {
} else if dest_len > 0 {
inner.insert_flags(Flags::RD_READY | Flags::RD_PAUSED);
} else {
inner.read_task.wake();
}
log::trace!(
"{}: New {} bytes available, wakeup dispatcher",
Expand Down
6 changes: 5 additions & 1 deletion ntex/src/http/h1/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,11 @@ where
}
} else {
// check for io changes, could close while waiting for service call
match ready!(self.io.poll_status_update(cx)) {
let result = self.io.poll_status_update(cx);
if result.is_pending() {
self.io.resume();
}
match ready!(result) {
IoStatusUpdate::KeepAlive => Poll::Pending,
IoStatusUpdate::Stop | IoStatusUpdate::PeerGone(_) => {
Poll::Ready(self.stop())
Expand Down
1 change: 1 addition & 0 deletions ntex/src/web/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,7 @@ where
},
}
.unwrap()
.set_tag("test", "WEB-SRV")
.run();

tx.send((System::current(), srv, local_addr)).unwrap();
Expand Down
2 changes: 0 additions & 2 deletions ntex/tests/http_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -703,8 +703,6 @@ impl Drop for SetOnDrop {

#[ntex::test]
async fn test_h1_client_drop() -> io::Result<()> {
//let _ = env_logger::try_init();

let count = Arc::new(AtomicUsize::new(0));
let count2 = count.clone();

Expand Down

0 comments on commit 2c56024

Please sign in to comment.