Skip to content

Commit

Permalink
fix many unsound futures
Browse files Browse the repository at this point in the history
  • Loading branch information
oscartbeaumont committed Jul 19, 2023
1 parent 2e237f7 commit f115ab2
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 68 deletions.
4 changes: 2 additions & 2 deletions src/internal/exec/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,8 +333,8 @@ impl<
for _ in 0..conn.streams.len() {
match ready!(conn.streams.as_mut().poll_next(cx)) {
Some((a, _)) => match a {
StreamYield::Item(batch) => {
this.batch.as_mut().insert(batch);
StreamYield::Item(resp) => {
this.batch.as_mut().insert(resp);
return PollResult::QueueSend.into();
}
StreamYield::Finished(f) => {
Expand Down
2 changes: 1 addition & 1 deletion src/internal/exec/owned_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ mod private {
pub struct OwnedStream<TCtx> {
arc: Arc<BuiltRouter<TCtx>>,
#[pin]
pub(crate) reference: Pin<Box<dyn Stream<Item = Result<Value, ExecError>> + Send>>,
reference: Pin<Box<dyn Stream<Item = Result<Value, ExecError>> + Send>>,
pub id: u32,
}
}
Expand Down
30 changes: 21 additions & 9 deletions src/internal/exec/stream_or_fut.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ mod private {
#[pin]
fut: ExecRequestFut,
},
// When the underlying stream shutdowns we yield a shutdown message. Once it is yielded we need to yield a `None` to tell the poller we are done.
PendingDone,
Done,
}
}
Expand All @@ -35,20 +37,19 @@ mod private {

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.as_mut().project() {
StreamOrFutProj::OwnedStream { stream } => {
let s = stream.project();

Poll::Ready(Some(match ready!(s.reference.poll_next(cx)) {
StreamOrFutProj::OwnedStream { mut stream } => {
Poll::Ready(Some(match ready!(stream.as_mut().poll_next(cx)) {
Some(r) => exec::Response {
id: *s.id,
id: stream.id,
inner: match r {
Ok(v) => exec::ResponseInner::Value(v),
Err(err) => exec::ResponseInner::Error(err.into()),
},
},
None => {
let id = *s.id;
self.set(StreamOrFut::Done);
let id = stream.id;
cx.waker().wake_by_ref(); // No wakers set so we set one
self.set(StreamOrFut::PendingDone);
exec::Response {
id,
inner: exec::ResponseInner::Complete,
Expand All @@ -57,10 +58,21 @@ mod private {
}))
}
StreamOrFutProj::ExecRequestFut { fut } => fut.poll(cx).map(|v| {
self.set(StreamOrFut::Done);
cx.waker().wake_by_ref(); // No wakers set so we set one
self.set(StreamOrFut::PendingDone);
Some(v)
}),
StreamOrFutProj::Done { .. } => Poll::Ready(None),
StreamOrFutProj::PendingDone => {
self.set(StreamOrFut::Done);
Poll::Ready(None)
}
StreamOrFutProj::Done => {
#[cfg(debug_assertions)]
panic!("`StreamOrFut` polled after completion");

#[cfg(not(debug_assertions))]
Poll::Ready(None)
}
}
}
}
Expand Down
131 changes: 91 additions & 40 deletions src/internal/middleware/middleware_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ mod private {
internal::{
middleware::Middleware,
middleware::{Executable2, MiddlewareContext, MwV2Result, RequestContext},
Layer, SealedLayer,
Layer, PinnedOption, PinnedOptionProj, SealedLayer,
},
ExecError,
};
Expand Down Expand Up @@ -49,7 +49,7 @@ mod private {
},
);

Ok(MiddlewareLayerFuture::First {
Ok(MiddlewareLayerFuture::Resolve {
fut,
next: &self.next,
})
Expand All @@ -60,82 +60,133 @@ mod private {
pub trait SendSyncStatic: Send + Sync + 'static {}
impl<T: Send + Sync + 'static> SendSyncStatic for T {}

// TODO: Cleanup generics on this
// TODO: Document phases
pin_project! {
#[project = MiddlewareLayerFutureProj]
pub enum MiddlewareLayerFuture<
'a,
// TODO: Remove one of these Ctx's and get from `TMiddleware` or `TNextMiddleware`
TLayerCtx: SendSyncStatic,
TNewMiddleware: Middleware<TLayerCtx>,
TMiddleware: Layer<TNewMiddleware::NewCtx>,
TMiddleware: Middleware<TLayerCtx>,
TNextLayer: Layer<TMiddleware::NewCtx>,
> {
First {
// We are waiting for the current middleware to run and yield it's result.
// Remember the middleware only runs once for an entire stream as it returns "instructions" on how to map the stream from then on.
Resolve {
// Future of the currently middleware.
// It's result will populate the `resp_fn` field for the next phase.
#[pin]
fut: TNewMiddleware::Fut,
next: &'a TMiddleware,
fut: TMiddleware::Fut,

// The next layer in the middleware chain
// This could be another middleware of the users resolver. It will be called to yield the `stream` for the next phase.
next: &'a TNextLayer,
},
Second {
// We are in this state where we are executing the current middleware on the stream
Execute {
// The actual data stream from the resolver function or next middleware
#[pin]
stream: TMiddleware::Stream<'a>,
resp: Option<<TNewMiddleware::Result as MwV2Result>::Resp>,
},
Third {
stream: TNextLayer::Stream<'a>,

// The currently executing future returned by the `resp_fn` (publicly `.map`) function
// Be aware this will go `None` -> `Some` -> `None`, etc for a subscription
#[pin]
fut: <<TNewMiddleware::Result as MwV2Result>::Resp as Executable2>::Fut,
resp_fut: PinnedOption<<<TMiddleware::Result as MwV2Result>::Resp as Executable2>::Fut>,
// The `.map` function returned by the user from the execution of the current middleware
// This allows a middleware to map the values being returned from the stream
resp_fn: Option<<TMiddleware::Result as MwV2Result>::Resp>,
},
// The stream is internally done but it returned `Poll::Ready` for the shutdown message so the caller thinks it's still active
// This will yield `Poll::Ready(None)` and transition into the `Self::Done` phase.
PendingDone,
// Stream is completed and will panic if polled again
Done,
}
}

impl<
'a,
TLayerCtx: Send + Sync + 'static,
TNewMiddleware: Middleware<TLayerCtx>,
TMiddleware: Layer<TNewMiddleware::NewCtx>,
> Stream for MiddlewareLayerFuture<'a, TLayerCtx, TNewMiddleware, TMiddleware>
TMiddleware: Middleware<TLayerCtx>,
TNextLayer: Layer<TMiddleware::NewCtx>,
> Stream for MiddlewareLayerFuture<'a, TLayerCtx, TMiddleware, TNextLayer>
{
type Item = Result<Value, ExecError>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
let new_value = match self.as_mut().project() {
MiddlewareLayerFutureProj::First { fut, next } => {
match self.as_mut().project() {
MiddlewareLayerFutureProj::Resolve { fut, next } => {
let result = ready!(fut.poll(cx));

let (ctx, input, req, resp) = result.explode()?;
let (ctx, input, req, resp_fn) = result.explode()?;

match next.call(ctx, input, req) {
Ok(stream) => Self::Second { stream, resp },
Err(err) => return Poll::Ready(Some(Err(err))),
Ok(stream) => {
self.as_mut().set(Self::Execute {
resp_fut: PinnedOption::None,
stream,
resp_fn,
});
}

Err(err) => {
cx.waker().wake_by_ref(); // No wakers set so we set one
self.as_mut().set(Self::PendingDone);
return Poll::Ready(Some(Err(err)));
}
}
}
MiddlewareLayerFutureProj::Second { stream, resp } => {
let result = ready!(stream.poll_next(cx));

let Some(resp) = resp.take() else {
return Poll::Ready(result);
};
MiddlewareLayerFutureProj::Execute {
mut stream,
mut resp_fut,
resp_fn,
} => {
if let PinnedOptionProj::Some { v } = resp_fut.as_mut().project() {
let result = ready!(v.poll(cx));
cx.waker().wake_by_ref(); // No wakers set so we set one
resp_fut.set(PinnedOption::None);
return Poll::Ready(Some(Ok(result)));
}

match result {
Some(Ok(value)) => Self::Third {
fut: resp.call(value),
match ready!(stream.as_mut().poll_next(cx)) {
Some(result) => match resp_fn {
Some(resp_fn) => match result {
Ok(result) => {
resp_fut.set(PinnedOption::Some {
v: (&*resp_fn).call(result),
});
continue;
}
// TODO: The `.map` function is skipped for errors. Maybe it should be possible to map them when desired?
Err(err) => return Poll::Ready(Some(Err(err))),
},

// No `.map` fn so we return the result as is
None => return Poll::Ready(Some(result)),
},
result => return Poll::Ready(result),
// The underlying stream has shutdown so we will too
None => {
self.as_mut().set(Self::Done);
return Poll::Ready(None);
}
}
}
MiddlewareLayerFutureProj::Third { fut } => {
return fut.poll(cx).map(|result| Some(Ok(result)))
MiddlewareLayerFutureProj::PendingDone => {
self.as_mut().set(Self::Done);
return Poll::Ready(None);
}
};
MiddlewareLayerFutureProj::Done => {
#[cfg(debug_assertions)]
panic!("`MiddlewareLayerFuture` polled after completion");

self.as_mut().set(new_value);
#[cfg(not(debug_assertions))]
return Poll::Ready(None);
}
}
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
match &self {
Self::Second { stream: c, .. } => c.size_hint(),
Self::Execute { stream: c, .. } => c.size_hint(),
_ => (0, None),
}
}
Expand Down
8 changes: 4 additions & 4 deletions src/internal/middleware/mw_result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ impl<TRet: Ret, TFut: Future<Output = TRet> + Send + 'static> Fut<TRet> for TFut
pub trait Executable2: Send + Sync + 'static {
type Fut: Future<Output = Value> + Send;

fn call(self, v: Value) -> Self::Fut;
fn call(&self, v: Value) -> Self::Fut;
}

impl<TFut: Fut<Value>, TFunc: FnOnce(Value) -> TFut + Send + Sync + 'static> Executable2 for TFunc {
impl<TFut: Fut<Value>, TFunc: Fn(Value) -> TFut + Send + Sync + 'static> Executable2 for TFunc {
type Fut = TFut;

fn call(self, v: Value) -> Self::Fut {
fn call(&self, v: Value) -> Self::Fut {
(self)(v)
}
}
Expand All @@ -37,7 +37,7 @@ pub struct Executable2Placeholder {}
impl Executable2 for Executable2Placeholder {
type Fut = Ready<Value>;

fn call(self, _: Value) -> Self::Fut {
fn call(&self, _: Value) -> Self::Fut {
unreachable!();
}
}
Expand Down
28 changes: 16 additions & 12 deletions src/internal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,27 @@ pub(crate) use procedure_store::*;
pub use resolver_function::*;
pub use resolver_result::*;

pin_project_lite::pin_project! {
#[project = PinnedOptionProj]
pub(crate) enum PinnedOption<T> {
Some {
#[pin]
v: T,
},
None,
mod private {
pin_project_lite::pin_project! {
#[project = PinnedOptionProj]
pub enum PinnedOption<T> {
Some {
#[pin]
v: T,
},
None,
}
}
}

impl<T> From<T> for PinnedOption<T> {
fn from(value: T) -> Self {
Self::Some { v: value }
impl<T> From<T> for PinnedOption<T> {
fn from(value: T) -> Self {
Self::Some { v: value }
}
}
}

pub(crate) use private::{PinnedOption, PinnedOptionProj};

#[cfg(test)]
mod tests {
use std::{fs::File, io::Write, path::PathBuf};
Expand Down

0 comments on commit f115ab2

Please sign in to comment.